Skip to content

Commit b974c39

Browse files
committed
refactor: update metrics tracking to be non-global
1 parent faa929c commit b974c39

File tree

8 files changed

+247
-307
lines changed

8 files changed

+247
-307
lines changed

Cargo.lock

+156-200
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ hex = "0.4"
3838
iroh-base = { version = "0.34", features = ["ticket"] }
3939
iroh-blobs = { version = "0.34" }
4040
iroh-gossip = { version = "0.34", optional = true, features = ["net"] }
41-
iroh-metrics = { version = "0.32", default-features = false }
41+
iroh-metrics = { version = "0.33", default-features = false }
4242
iroh = { version = "0.34", optional = true }
4343
num_enum = "0.7"
4444
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
@@ -117,3 +117,9 @@ rpc = [
117117
[package.metadata.docs.rs]
118118
all-features = true
119119
rustdoc-args = ["--cfg", "iroh_docsrs"]
120+
121+
[patch.crates-io]
122+
iroh = { git = "https://github.com/n0-computer/iroh", branch = "Frando/metrics2" }
123+
iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "Frando/metrics" }
124+
iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "refactor/metrics2" }
125+
iroh-metrics = { git = "https://github.com/n0-computer/iroh-metrics", branch = "Frando/derive-metricsgroupset" }

src/actor.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use anyhow::{anyhow, Context, Result};
1212
use bytes::Bytes;
1313
use futures_util::FutureExt;
1414
use iroh_blobs::Hash;
15-
use iroh_metrics::inc;
1615
use serde::{Deserialize, Serialize};
1716
use tokio::{sync::oneshot, task::JoinSet};
1817
use tracing::{debug, error, error_span, trace, warn};
@@ -224,6 +223,7 @@ struct OpenReplica {
224223
pub struct SyncHandle {
225224
tx: async_channel::Sender<Action>,
226225
join_handle: Arc<Option<JoinHandle<()>>>,
226+
metrics: Arc<Metrics>,
227227
}
228228

229229
/// Options when opening a replica.
@@ -255,13 +255,15 @@ impl SyncHandle {
255255
content_status_callback: Option<ContentStatusCallback>,
256256
me: String,
257257
) -> SyncHandle {
258+
let metrics = Arc::new(Metrics::default());
258259
let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
259260
let actor = Actor {
260261
store,
261262
states: Default::default(),
262263
action_rx,
263264
content_status_callback,
264265
tasks: Default::default(),
266+
metrics: metrics.clone(),
265267
};
266268
let join_handle = std::thread::Builder::new()
267269
.name("sync-actor".to_string())
@@ -278,9 +280,15 @@ impl SyncHandle {
278280
SyncHandle {
279281
tx: action_tx,
280282
join_handle,
283+
metrics,
281284
}
282285
}
283286

287+
/// Returns the metrics collected in this sync actor.
288+
pub fn metrics(&self) -> &Arc<Metrics> {
289+
&self.metrics
290+
}
291+
284292
pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
285293
let (reply, rx) = oneshot::channel();
286294
let action = ReplicaAction::Open { reply, opts };
@@ -599,6 +607,7 @@ struct Actor {
599607
action_rx: async_channel::Receiver<Action>,
600608
content_status_callback: Option<ContentStatusCallback>,
601609
tasks: JoinSet<()>,
610+
metrics: Arc<Metrics>,
602611
}
603612

604613
impl Actor {
@@ -634,7 +643,7 @@ impl Actor {
634643
}
635644
};
636645
trace!(%action, "tick");
637-
inc!(Metrics, actor_tick_main);
646+
self.metrics.actor_tick_main.inc();
638647
match action {
639648
Action::Shutdown { reply } => {
640649
break reply;
@@ -750,6 +759,8 @@ impl Actor {
750759
let author = get_author(&mut this.store, &author)?;
751760
let mut replica = this.states.replica(namespace, &mut this.store)?;
752761
replica.insert(&key, &author, hash, len)?;
762+
this.metrics.new_entries_local.inc();
763+
this.metrics.new_entries_local_size.inc_by(len);
753764
Ok(())
754765
}),
755766
ReplicaAction::DeletePrefix { author, key, reply } => {
@@ -769,7 +780,10 @@ impl Actor {
769780
let mut replica = this
770781
.states
771782
.replica_if_syncing(&namespace, &mut this.store)?;
783+
let len = entry.content_len();
772784
replica.insert_remote_entry(entry, from, content_status)?;
785+
this.metrics.new_entries_remote.inc();
786+
this.metrics.new_entries_remote_size.inc_by(len);
773787
Ok(())
774788
}),
775789

src/engine.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ pub use self::{
2828
state::{Origin, SyncReason},
2929
};
3030
use crate::{
31-
actor::SyncHandle, Author, AuthorId, ContentStatus, ContentStatusCallback, Entry, NamespaceId,
31+
actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
32+
Entry, NamespaceId,
3233
};
3334

3435
mod gossip;
@@ -90,6 +91,7 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
9091
downloader,
9192
to_live_actor_recv,
9293
live_actor_tx.clone(),
94+
sync.metrics().clone(),
9395
);
9496
let actor_handle = tokio::task::spawn(
9597
async move {
@@ -155,6 +157,11 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
155157
&self.blob_store
156158
}
157159

160+
/// Returns the metrics tracked for this engine.
161+
pub fn metrics(&self) -> &Arc<Metrics> {
162+
self.sync.metrics()
163+
}
164+
158165
/// Start to sync a document.
159166
///
160167
/// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,

src/engine/live.rs

+21-9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
use std::{
44
collections::{HashMap, HashSet},
5+
sync::Arc,
56
time::SystemTime,
67
};
78

@@ -15,7 +16,6 @@ use iroh_blobs::{
1516
Hash, HashAndFormat,
1617
};
1718
use iroh_gossip::net::Gossip;
18-
use iroh_metrics::inc;
1919
use serde::{Deserialize, Serialize};
2020
use tokio::{
2121
sync::{self, mpsc, oneshot},
@@ -180,6 +180,7 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
180180

181181
/// Sync state per replica and peer
182182
state: NamespaceStates,
183+
metrics: Arc<Metrics>,
183184
}
184185
impl<B: iroh_blobs::store::Store> LiveActor<B> {
185186
/// Create the live actor.
@@ -192,6 +193,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
192193
downloader: Downloader,
193194
inbox: mpsc::Receiver<ToLiveActor>,
194195
sync_actor_tx: mpsc::Sender<ToLiveActor>,
196+
metrics: Arc<Metrics>,
195197
) -> Self {
196198
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
197199
let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
@@ -212,6 +214,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
212214
state: Default::default(),
213215
missing_hashes: Default::default(),
214216
queued_hashes: Default::default(),
217+
metrics,
215218
}
216219
}
217220

@@ -236,13 +239,13 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
236239
loop {
237240
i += 1;
238241
trace!(?i, "tick wait");
239-
inc!(Metrics, doc_live_tick_main);
242+
self.metrics.doc_live_tick_main.inc();
240243
tokio::select! {
241244
biased;
242245
msg = self.inbox.recv() => {
243246
let msg = msg.context("to_actor closed")?;
244247
trace!(?i, %msg, "tick: to_actor");
245-
inc!(Metrics, doc_live_tick_actor);
248+
self.metrics.doc_live_tick_actor.inc();
246249
match msg {
247250
ToLiveActor::Shutdown { reply } => {
248251
break Ok(reply);
@@ -254,28 +257,28 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
254257
}
255258
event = self.replica_events_rx.recv() => {
256259
trace!(?i, "tick: replica_event");
257-
inc!(Metrics, doc_live_tick_replica_event);
260+
self.metrics.doc_live_tick_replica_event.inc();
258261
let event = event.context("replica_events closed")?;
259262
if let Err(err) = self.on_replica_event(event).await {
260263
error!(?err, "Failed to process replica event");
261264
}
262265
}
263266
Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
264267
trace!(?i, "tick: running_sync_connect");
265-
inc!(Metrics, doc_live_tick_running_sync_connect);
268+
self.metrics.doc_live_tick_running_sync_connect.inc();
266269
let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
267270
self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
268271

269272
}
270273
Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
271274
trace!(?i, "tick: running_sync_accept");
272-
inc!(Metrics, doc_live_tick_running_sync_accept);
275+
self.metrics.doc_live_tick_running_sync_accept.inc();
273276
let res = res.context("running_sync_accept closed")?;
274277
self.on_sync_via_accept_finished(res).await;
275278
}
276279
Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
277280
trace!(?i, "tick: pending_downloads");
278-
inc!(Metrics, doc_live_tick_pending_downloads);
281+
self.metrics.doc_live_tick_pending_downloads.inc();
279282
let (namespace, hash, res) = res.context("pending_downloads closed")?;
280283
self.on_download_ready(namespace, hash, res).await;
281284
}
@@ -362,8 +365,16 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
362365
}
363366
let endpoint = self.endpoint.clone();
364367
let sync = self.sync.clone();
368+
let metrics = self.metrics.clone();
365369
let fut = async move {
366-
let res = connect_and_sync(&endpoint, &sync, namespace, NodeAddr::new(peer)).await;
370+
let res = connect_and_sync(
371+
&endpoint,
372+
&sync,
373+
namespace,
374+
NodeAddr::new(peer),
375+
Some(&metrics),
376+
)
377+
.await;
367378
(namespace, peer, reason, res)
368379
}
369380
.instrument(Span::current());
@@ -787,8 +798,9 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
787798
};
788799
debug!("incoming connection");
789800
let sync = self.sync.clone();
801+
let metrics = self.metrics.clone();
790802
self.running_sync_accept.spawn(
791-
async move { handle_connection(sync, conn, accept_request_cb).await }
803+
async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
792804
.instrument(Span::current()),
793805
);
794806
}

src/metrics.rs

+23-59
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,49 @@
11
//! Metrics for iroh-docs
22
3-
use iroh_metrics::{
4-
core::{Counter, Metric},
5-
struct_iterable::Iterable,
6-
};
3+
use iroh_metrics::{Counter, MetricsGroup};
74

85
/// Metrics for iroh-docs
9-
#[allow(missing_docs)]
10-
#[derive(Debug, Clone, Iterable)]
6+
#[derive(Debug, Default, MetricsGroup)]
117
pub struct Metrics {
8+
/// Number of document entries added locally
129
pub new_entries_local: Counter,
10+
/// Number of document entries added by peers
1311
pub new_entries_remote: Counter,
12+
/// Total size of entry contents added locally
1413
pub new_entries_local_size: Counter,
14+
/// Total size of entry contents added by peers
1515
pub new_entries_remote_size: Counter,
16-
pub sync_via_connect_success: Counter,
17-
pub sync_via_connect_failure: Counter,
16+
/// Number of successful syncs (via accept)
1817
pub sync_via_accept_success: Counter,
18+
/// Number of failed syncs (via accept)
1919
pub sync_via_accept_failure: Counter,
20+
/// Number of successful syncs (via connect)
21+
pub sync_via_connect_success: Counter,
22+
/// Number of failed syncs (via connect)
23+
pub sync_via_connect_failure: Counter,
2024

25+
/// Number of times the main actor loop ticked
2126
pub actor_tick_main: Counter,
2227

28+
/// Number of times the gossip actor loop ticked
2329
pub doc_gossip_tick_main: Counter,
30+
/// Number of times the gossip actor processed an event
2431
pub doc_gossip_tick_event: Counter,
32+
/// Number of times the gossip actor processed an actor event
2533
pub doc_gossip_tick_actor: Counter,
34+
/// Number of times the gossip actor processed a pending join
2635
pub doc_gossip_tick_pending_join: Counter,
2736

37+
/// Number of times the live actor loop ticked
2838
pub doc_live_tick_main: Counter,
39+
/// Number of times the live actor processed an actor event
2940
pub doc_live_tick_actor: Counter,
41+
/// Number of times the live actor processed a replica event
3042
pub doc_live_tick_replica_event: Counter,
43+
/// Number of times the live actor processed a running sync connect
3144
pub doc_live_tick_running_sync_connect: Counter,
45+
/// Number of times the live actor processed a running sync accept
3246
pub doc_live_tick_running_sync_accept: Counter,
47+
/// Number of times the live actor processed a pending download
3348
pub doc_live_tick_pending_downloads: Counter,
3449
}
35-
36-
impl Default for Metrics {
37-
fn default() -> Self {
38-
Self {
39-
new_entries_local: Counter::new("Number of document entries added locally"),
40-
new_entries_remote: Counter::new("Number of document entries added by peers"),
41-
new_entries_local_size: Counter::new("Total size of entry contents added locally"),
42-
new_entries_remote_size: Counter::new("Total size of entry contents added by peers"),
43-
sync_via_accept_success: Counter::new("Number of successful syncs (via accept)"),
44-
sync_via_accept_failure: Counter::new("Number of failed syncs (via accept)"),
45-
sync_via_connect_success: Counter::new("Number of successful syncs (via connect)"),
46-
sync_via_connect_failure: Counter::new("Number of failed syncs (via connect)"),
47-
48-
actor_tick_main: Counter::new("Number of times the main actor loop ticked"),
49-
50-
doc_gossip_tick_main: Counter::new("Number of times the gossip actor loop ticked"),
51-
doc_gossip_tick_event: Counter::new(
52-
"Number of times the gossip actor processed an event",
53-
),
54-
doc_gossip_tick_actor: Counter::new(
55-
"Number of times the gossip actor processed an actor event",
56-
),
57-
doc_gossip_tick_pending_join: Counter::new(
58-
"Number of times the gossip actor processed a pending join",
59-
),
60-
61-
doc_live_tick_main: Counter::new("Number of times the live actor loop ticked"),
62-
doc_live_tick_actor: Counter::new(
63-
"Number of times the live actor processed an actor event",
64-
),
65-
doc_live_tick_replica_event: Counter::new(
66-
"Number of times the live actor processed a replica event",
67-
),
68-
doc_live_tick_running_sync_connect: Counter::new(
69-
"Number of times the live actor processed a running sync connect",
70-
),
71-
doc_live_tick_running_sync_accept: Counter::new(
72-
"Number of times the live actor processed a running sync accept",
73-
),
74-
doc_live_tick_pending_downloads: Counter::new(
75-
"Number of times the live actor processed a pending download",
76-
),
77-
}
78-
}
79-
}
80-
81-
impl Metric for Metrics {
82-
fn name() -> &'static str {
83-
"iroh_docs"
84-
}
85-
}

0 commit comments

Comments
 (0)