Skip to content

Commit 3e555bc

Browse files
authored
feat: distributed tracing (#46)
* feat: distributed tracing * fix: instrument rpc client & p2p service * fix: instrument some more * cr * cr
1 parent d101e5a commit 3e555bc

File tree

14 files changed

+172
-61
lines changed

14 files changed

+172
-61
lines changed

iroh-gateway/src/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};
44
use opentelemetry::trace::{TraceContextExt, TraceId};
55
use tracing_opentelemetry::OpenTelemetrySpanExt;
66

7-
pub fn metrics_config(logger_only: bool) -> iroh_metrics::Config {
7+
pub fn metrics_config(logger_only: bool) -> iroh_metrics::config::Config {
88
// compile time configuration
99
let service_name = env!("CARGO_PKG_NAME").to_string();
1010
let build = git_version!().to_string();
@@ -14,7 +14,7 @@ pub fn metrics_config(logger_only: bool) -> iroh_metrics::Config {
1414
let instance_id = std::env::var("IROH_INSTANCE_ID")
1515
.unwrap_or_else(|_| names::Generator::default().next().unwrap());
1616
let service_env = std::env::var("IROH_ENV").unwrap_or_else(|_| "dev".to_string());
17-
iroh_metrics::Config::new(
17+
iroh_metrics::config::Config::new(
1818
service_name,
1919
instance_id,
2020
build,

iroh-metrics/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ opentelemetry-otlp = { version = "0.10.0", features = ["grpc-sys"] }
1717
metrics = "0.18.1"
1818
metrics-util = "0.12"
1919
metrics-exporter-prometheus = { version = "0.9", features = ["push-gateway"]}
20-
metrics-exporter-log = "0.4.0"
20+
metrics-exporter-log = "0.4.0"
21+
tonic = "0.7.2"

iroh-metrics/src/config.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#[derive(Debug, Clone)]
2+
pub struct Config {
3+
/// The name of the service. Should be the same as the Cargo package name.
4+
pub service_name: String,
5+
/// A unique identifier for this instance of the service.
6+
pub instance_id: String,
7+
/// The build version of the service (commit hash).
8+
pub build: String,
9+
/// The version of the service. Should be the same as the Cargo package version.
10+
pub version: String,
11+
/// The environment of the service.
12+
pub service_env: String,
13+
/// Flag to enable debug mode.
14+
pub debug: bool,
15+
/// The endpoint of the trace collector.
16+
pub collector_endpoint: String,
17+
/// The endpoint of the prometheus push gateway.
18+
pub prometheus_gateway_endpoint: String,
19+
}
20+
21+
impl Config {
22+
pub fn new(
23+
service_name: String,
24+
instance_id: String,
25+
build: String,
26+
version: String,
27+
service_env: String,
28+
debug: bool,
29+
) -> Self {
30+
let debug: bool = std::env::var("IROH_METRICS_DEBUG")
31+
.ok()
32+
.and_then(|v| v.parse().ok())
33+
.unwrap_or(debug);
34+
let collector_endpoint = std::env::var("IROH_METRICS_COLLECTOR_ENDPOINT")
35+
.unwrap_or_else(|_| "http://localhost:4317".to_string());
36+
let prometheus_gateway_endpoint = std::env::var("IROH_METRICS_PROM_GATEWAY_ENDPOINT")
37+
.unwrap_or_else(|_| "http://localhost:9091".to_string());
38+
39+
Config {
40+
service_name,
41+
instance_id,
42+
build,
43+
version,
44+
service_env,
45+
debug,
46+
collector_endpoint,
47+
prometheus_gateway_endpoint,
48+
}
49+
}
50+
}

iroh-metrics/src/lib.rs

Lines changed: 9 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
pub mod config;
2+
pub mod req;
3+
4+
use config::Config;
15
use metrics_exporter_prometheus::PrometheusBuilder;
2-
use opentelemetry::sdk::{trace, Resource};
6+
use opentelemetry::{
7+
global,
8+
sdk::{propagation::TraceContextPropagator, trace, Resource},
9+
};
310
use opentelemetry_otlp::WithExportConfig;
411
use std::env::consts::{ARCH, OS};
512
use std::time::Duration;
@@ -34,6 +41,7 @@ pub fn init_tracer(cfg: Config) -> Result<(), Box<dyn std::error::Error>> {
3441
if cfg.debug {
3542
tracing_subscriber::registry().with(log_subscriber).init();
3643
} else {
44+
global::set_text_map_propagator(TraceContextPropagator::new());
3745
let tracer = opentelemetry_otlp::new_pipeline()
3846
.tracing()
3947
.with_exporter(
@@ -66,52 +74,3 @@ pub fn init_tracer(cfg: Config) -> Result<(), Box<dyn std::error::Error>> {
6674
pub fn shutdown_tracing() {
6775
opentelemetry::global::shutdown_tracer_provider();
6876
}
69-
70-
#[derive(Debug, Clone)]
71-
pub struct Config {
72-
/// The name of the service. Should be the same as the Cargo package name.
73-
pub service_name: String,
74-
/// A unique identifier for this instance of the service.
75-
pub instance_id: String,
76-
/// The build version of the service (commit hash).
77-
pub build: String,
78-
/// The version of the service. Should be the same as the Cargo package version.
79-
pub version: String,
80-
/// The environment of the service.
81-
pub service_env: String,
82-
/// Flag to enable debug mode.
83-
pub debug: bool,
84-
/// The endpoint of the trace collector.
85-
pub collector_endpoint: String,
86-
/// The endpoint of the prometheus push gateway.
87-
pub prometheus_gateway_endpoint: String,
88-
}
89-
90-
impl Config {
91-
pub fn new(
92-
service_name: String,
93-
instance_id: String,
94-
build: String,
95-
version: String,
96-
service_env: String,
97-
debug: bool,
98-
) -> Self {
99-
let debug =
100-
std::env::var("IROH_METRICS_DEBUG").unwrap_or_else(|_| debug.to_string()) == "true";
101-
let collector_endpoint = std::env::var("IROH_METRICS_COLLECTOR_ENDPOINT")
102-
.unwrap_or_else(|_| "http://localhost:4317".to_string());
103-
let prometheus_gateway_endpoint = std::env::var("IROH_METRICS_PROM_GATEWAY_ENDPOINT")
104-
.unwrap_or_else(|_| "http://localhost:9091".to_string());
105-
106-
Config {
107-
service_name,
108-
instance_id,
109-
build,
110-
version,
111-
service_env,
112-
debug,
113-
collector_endpoint,
114-
prometheus_gateway_endpoint,
115-
}
116-
}
117-
}

iroh-metrics/src/req.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use opentelemetry::{
2+
global,
3+
propagation::{Extractor, Injector},
4+
};
5+
use tonic::Request;
6+
use tracing_opentelemetry::OpenTelemetrySpanExt;
7+
8+
pub fn trace_tonic_req<T>(req: T) -> Request<T> {
9+
let mut req = Request::new(req);
10+
11+
global::get_text_map_propagator(|propagator| {
12+
propagator.inject_context(
13+
&tracing::Span::current().context(),
14+
&mut MutMetadataMap(req.metadata_mut()),
15+
)
16+
});
17+
18+
req
19+
}
20+
21+
pub fn set_trace_ctx<T>(req: &Request<T>) {
22+
let parent_cx =
23+
global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
24+
tracing::Span::current().set_parent(parent_cx);
25+
}
26+
27+
struct MetadataMap<'a>(pub &'a tonic::metadata::MetadataMap);
28+
struct MutMetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
29+
30+
impl<'a> Injector for MutMetadataMap<'a> {
31+
/// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
32+
fn set(&mut self, key: &str, value: String) {
33+
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
34+
if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
35+
self.0.insert(key, val);
36+
}
37+
}
38+
}
39+
}
40+
41+
impl<'a> Extractor for MetadataMap<'a> {
42+
/// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None
43+
fn get(&self, key: &str) -> Option<&str> {
44+
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
45+
}
46+
47+
/// Collect all the keys from the MetadataMap.
48+
fn keys(&self) -> Vec<&str> {
49+
self.0
50+
.keys()
51+
.map(|key| match key {
52+
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
53+
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
54+
})
55+
.collect::<Vec<_>>()
56+
}
57+
}

iroh-p2p/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ iroh-rpc-client = { path = "../iroh-rpc-client" }
3232
iroh-util = { path = "../iroh-util" }
3333
rkyv = { version = "0.7.37", features = ["std", "alloc", "validation"] }
3434
tonic = "0.7.2"
35+
iroh-metrics = { path = "../iroh-metrics" }
36+
names = { version = "0.13.0", default-features = false }
37+
git-version = "0.3.5"
3538

3639
[dependencies.libp2p]
3740
version = "0.44"

iroh-p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod behaviour;
22
mod config;
3+
pub mod metrics;
34
pub mod rpc;
45
mod service;
56

iroh-p2p/src/main.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
use iroh_p2p::Libp2pService;
1+
use iroh_p2p::{metrics, Libp2pService};
22
use libp2p::identity::{ed25519, Keypair};
33
use tokio::task;
44
use tracing::error;
5-
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
65

76
/// Starts daemon process
87
#[tokio::main(flavor = "multi_thread")]
98
async fn main() -> anyhow::Result<()> {
10-
tracing_subscriber::registry()
11-
.with(fmt::layer().pretty())
12-
.with(EnvFilter::from_default_env())
13-
.init();
9+
iroh_metrics::init(metrics::metrics_config(false)).expect("failed to initialize metrics");
1410

1511
let version = option_env!("IROH_VERSION").unwrap_or(env!("CARGO_PKG_VERSION"));
1612

@@ -41,5 +37,6 @@ async fn main() -> anyhow::Result<()> {
4137
// Cancel all async services
4238
p2p_task.abort();
4339

40+
iroh_metrics::shutdown_tracing();
4441
Ok(())
4542
}

iroh-p2p/src/metrics.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use git_version::git_version;
2+
3+
pub fn metrics_config(logger_only: bool) -> iroh_metrics::config::Config {
4+
// compile time configuration
5+
let service_name = env!("CARGO_PKG_NAME").to_string();
6+
let build = git_version!().to_string();
7+
let version = env!("CARGO_PKG_VERSION").to_string();
8+
9+
// runtime configuration
10+
let instance_id = std::env::var("IROH_INSTANCE_ID")
11+
.unwrap_or_else(|_| names::Generator::default().next().unwrap());
12+
let service_env = std::env::var("IROH_ENV").unwrap_or_else(|_| "dev".to_string());
13+
iroh_metrics::config::Config::new(
14+
service_name,
15+
instance_id,
16+
build,
17+
version,
18+
service_env,
19+
logger_only,
20+
)
21+
}

iroh-p2p/src/rpc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ struct P2p {
2626
impl p2p_server::P2p for P2p {
2727
// TODO: expand to handle multiple cids at once. Probably not a tough fix, just want to push
2828
// forward right now
29+
#[tracing::instrument(skip(self, request))]
2930
async fn fetch_bitswap(
3031
&self,
3132
request: Request<BitswapRequest>,
3233
) -> Result<Response<BitswapResponse>, tonic::Status> {
34+
iroh_metrics::req::set_trace_ctx(&request);
3335
let req = request.into_inner();
3436
let cid = Cid::read_bytes(io::Cursor::new(req.cid))
3537
.map_err(|e| Status::invalid_argument(format!("invalid cid: {:?}", e)))?;
@@ -85,10 +87,12 @@ impl p2p_server::P2p for P2p {
8587
Ok(Response::new(BitswapResponse { data: block.data }))
8688
}
8789

90+
#[tracing::instrument(skip(self, request))]
8891
async fn fetch_provider(
8992
&self,
9093
request: Request<iroh_rpc_types::p2p::Key>,
9194
) -> Result<Response<Providers>, tonic::Status> {
95+
iroh_metrics::req::set_trace_ctx(&request);
9296
let req = request.into_inner();
9397
let (s, r) = oneshot::channel();
9498
let msg = RpcMessage::ProviderRequest {

iroh-resolver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bytes = "1.1.0"
1717
iroh-rpc-client = { path = "../iroh-rpc-client" }
1818
tokio = { version = "1.18.0" }
1919
futures = "0.3.5"
20+
tracing = "0.1.34"
2021

2122
[dev-dependencies]
2223
criterion = { version = "0.3.5", features = ["async_tokio"] }

iroh-resolver/src/resolver.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl Resolver {
137137
}
138138

139139
/// Resolves through a given path, returning the [`Cid`] and raw bytes of the final leaf.
140+
#[tracing::instrument(skip(self))]
140141
pub async fn resolve(&self, path: Path) -> Result<Out> {
141142
// Resolve the root block.
142143
let (root_cid, root_bytes) = self.resolve_root(path.typ, &path.root).await?;
@@ -154,6 +155,7 @@ impl Resolver {
154155
}
155156

156157
/// Resolves through both DagPb and nested UnixFs DAGs.
158+
#[tracing::instrument(skip(self, bytes))]
157159
async fn resolve_dag_pb_or_unixfs(
158160
&self,
159161
cid: Cid,
@@ -186,6 +188,7 @@ impl Resolver {
186188
}
187189
}
188190

191+
#[tracing::instrument(skip(self, bytes))]
189192
async fn resolve_dag_pb(&self, cid: Cid, bytes: Bytes, path: Vec<String>) -> Result<Out> {
190193
let ipld: libipld::Ipld = libipld::IpldCodec::DagPb
191194
.decode(&bytes)
@@ -197,6 +200,7 @@ impl Resolver {
197200
Ok(Out::DagPb(out))
198201
}
199202

203+
#[tracing::instrument(skip(self, bytes))]
200204
async fn resolve_dag_cbor(&self, cid: Cid, bytes: Bytes, path: Vec<String>) -> Result<Out> {
201205
let ipld: libipld::Ipld = libipld::IpldCodec::DagCbor
202206
.decode(&bytes)
@@ -208,6 +212,7 @@ impl Resolver {
208212
Ok(Out::DagCbor(out))
209213
}
210214

215+
#[tracing::instrument(skip(self, bytes))]
211216
async fn resolve_dag_json(&self, cid: Cid, bytes: Bytes, path: Vec<String>) -> Result<Out> {
212217
let ipld: libipld::Ipld = libipld::IpldCodec::DagJson
213218
.decode(&bytes)
@@ -219,6 +224,7 @@ impl Resolver {
219224
Ok(Out::DagJson(out))
220225
}
221226

227+
#[tracing::instrument(skip(self))]
222228
async fn resolve_ipld(
223229
&self,
224230
_cid: Cid,
@@ -262,6 +268,7 @@ impl Resolver {
262268
Ok(current.clone())
263269
}
264270

271+
#[tracing::instrument(skip(self))]
265272
async fn resolve_root(&self, typ: PathType, root: &CidOrDomain) -> Result<(Cid, Bytes)> {
266273
match typ {
267274
PathType::Ipfs => match root {
@@ -279,6 +286,7 @@ impl Resolver {
279286
}
280287

281288
/// Loads the actual content of a given cid.
289+
#[tracing::instrument(skip(self))]
282290
async fn load_cid(&self, cid: &Cid) -> Result<Bytes> {
283291
// TODO: better strategies
284292
let providers = None;
@@ -287,6 +295,7 @@ impl Resolver {
287295
}
288296

289297
/// Resolves a dnslink at the given domain.
298+
#[tracing::instrument(skip(self))]
290299
async fn resolve_dnslink(&self, _domain: &str) -> Result<Cid> {
291300
todo!()
292301
}

iroh-rpc-client/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ prost = "0.10.3"
1717
anyhow = "1.0.57"
1818
bytes = "1.1.0"
1919
libp2p = "0.44.0"
20+
iroh-metrics = { path = "../iroh-metrics" }
21+
tracing = "0.1.34"

0 commit comments

Comments
 (0)