Skip to content

Commit 603ce61

Browse files
authored
fix(service/minitrace): should set local parent (#2620)
1 parent df79fb6 commit 603ce61

File tree

1 file changed

+89
-26
lines changed

1 file changed

+89
-26
lines changed

core/src/layers/minitrace.rs

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use bytes::Bytes;
2525
use futures::FutureExt;
2626
use minitrace::prelude::*;
2727

28+
use crate::raw::oio::AppendOperation;
2829
use crate::raw::oio::PageOperation;
2930
use crate::raw::oio::ReadOperation;
3031
use crate::raw::oio::WriteOperation;
@@ -134,15 +135,15 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
134135
type BlockingReader = MinitraceWrapper<A::BlockingReader>;
135136
type Writer = MinitraceWrapper<A::Writer>;
136137
type BlockingWriter = MinitraceWrapper<A::BlockingWriter>;
137-
type Appender = A::Appender;
138+
type Appender = MinitraceWrapper<A::Appender>;
138139
type Pager = MinitraceWrapper<A::Pager>;
139140
type BlockingPager = MinitraceWrapper<A::BlockingPager>;
140141

141142
fn inner(&self) -> &Self::Inner {
142143
&self.inner
143144
}
144145

145-
#[trace("metadata")]
146+
#[trace]
146147
fn metadata(&self) -> AccessorInfo {
147148
self.inner.info()
148149
}
@@ -152,24 +153,49 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
152153
self.inner.create_dir(path, args).await
153154
}
154155

156+
#[trace(enter_on_poll = true)]
155157
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
156-
let span = Span::enter_with_local_parent("read");
157158
self.inner
158159
.read(path, args)
159-
.map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
160+
.map(|v| {
161+
v.map(|(rp, r)| {
162+
(
163+
rp,
164+
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
165+
)
166+
})
167+
})
160168
.await
161169
}
162170

171+
#[trace(enter_on_poll = true)]
163172
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
164-
let span = Span::enter_with_local_parent("write");
165173
self.inner
166174
.write(path, args)
167-
.map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r))))
175+
.map(|v| {
176+
v.map(|(rp, r)| {
177+
(
178+
rp,
179+
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
180+
)
181+
})
182+
})
168183
.await
169184
}
170185

186+
#[trace(enter_on_poll = true)]
171187
async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
172-
self.inner.append(path, args).await
188+
self.inner
189+
.append(path, args)
190+
.map(|v| {
191+
v.map(|(rp, r)| {
192+
(
193+
rp,
194+
MinitraceWrapper::new(Span::enter_with_local_parent("AppendOperation"), r),
195+
)
196+
})
197+
})
198+
.await
173199
}
174200

175201
#[trace(enter_on_poll = true)]
@@ -192,11 +218,18 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
192218
self.inner.delete(path, args).await
193219
}
194220

221+
#[trace(enter_on_poll = true)]
195222
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
196-
let span = Span::enter_with_local_parent("list");
197223
self.inner
198224
.list(path, args)
199-
.map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s))))
225+
.map(|v| {
226+
v.map(|(rp, s)| {
227+
(
228+
rp,
229+
MinitraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s),
230+
)
231+
})
232+
})
200233
.await
201234
}
202235

@@ -215,22 +248,22 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
215248
self.inner.blocking_create_dir(path, args)
216249
}
217250

251+
#[trace]
218252
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
219-
let span = Span::enter_with_local_parent("blocking_read");
220253
self.inner.blocking_read(path, args).map(|(rp, r)| {
221254
(
222255
rp,
223-
MinitraceWrapper::new(Span::enter_with_parent("ReadOperation", &span), r),
256+
MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r),
224257
)
225258
})
226259
}
227260

261+
#[trace]
228262
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
229-
let span = Span::enter_with_local_parent("blocking_write");
230263
self.inner.blocking_write(path, args).map(|(rp, r)| {
231264
(
232265
rp,
233-
MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r),
266+
MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r),
234267
)
235268
})
236269
}
@@ -255,12 +288,12 @@ impl<A: Accessor> LayeredAccessor for MinitraceAccessor<A> {
255288
self.inner.blocking_delete(path, args)
256289
}
257290

291+
#[trace]
258292
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
259-
let span = Span::enter_with_local_parent("blocking_list");
260293
self.inner.blocking_list(path, args).map(|(rp, it)| {
261294
(
262295
rp,
263-
MinitraceWrapper::new(Span::enter_with_parent("PageOperation", &span), it),
296+
MinitraceWrapper::new(Span::enter_with_local_parent("PageOperation"), it),
264297
)
265298
})
266299
}
@@ -279,34 +312,40 @@ impl<R> MinitraceWrapper<R> {
279312

280313
impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
281314
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
282-
let _span = Span::enter_with_parent(ReadOperation::Read.into_static(), &self.span);
315+
let _g = self.span.set_local_parent();
316+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::Read.into_static());
283317
self.inner.poll_read(cx, buf)
284318
}
285319

286320
fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
287-
let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(), &self.span);
321+
let _g = self.span.set_local_parent();
322+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::Seek.into_static());
288323
self.inner.poll_seek(cx, pos)
289324
}
290325

291326
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
292-
let _span = Span::enter_with_parent(ReadOperation::Next.into_static(), &self.span);
327+
let _g = self.span.set_local_parent();
328+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::Next.into_static());
293329
self.inner.poll_next(cx)
294330
}
295331
}
296332

297333
impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
298334
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
299-
let _span = Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span);
335+
let _g = self.span.set_local_parent();
336+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
300337
self.inner.read(buf)
301338
}
302339

303340
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
304-
let _span = Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span);
341+
let _g = self.span.set_local_parent();
342+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static());
305343
self.inner.seek(pos)
306344
}
307345

308346
fn next(&mut self) -> Option<Result<Bytes>> {
309-
let _span = Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span);
347+
let _g = self.span.set_local_parent();
348+
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static());
310349
self.inner.next()
311350
}
312351
}
@@ -356,18 +395,41 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
356395

357396
impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
358397
fn write(&mut self, bs: Bytes) -> Result<()> {
359-
let _span =
360-
Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(), &self.span);
398+
let _g = self.span.set_local_parent();
399+
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
361400
self.inner.write(bs)
362401
}
363402

364403
fn close(&mut self) -> Result<()> {
365-
let _span =
366-
Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), &self.span);
404+
let _g = self.span.set_local_parent();
405+
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static());
367406
self.inner.close()
368407
}
369408
}
370409

410+
#[async_trait]
411+
impl<R: oio::Append> oio::Append for MinitraceWrapper<R> {
412+
async fn append(&mut self, bs: Bytes) -> Result<()> {
413+
self.inner
414+
.append(bs)
415+
.in_span(Span::enter_with_parent(
416+
AppendOperation::Append.into_static(),
417+
&self.span,
418+
))
419+
.await
420+
}
421+
422+
async fn close(&mut self) -> Result<()> {
423+
self.inner
424+
.close()
425+
.in_span(Span::enter_with_parent(
426+
AppendOperation::Close.into_static(),
427+
&self.span,
428+
))
429+
.await
430+
}
431+
}
432+
371433
#[async_trait]
372434
impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
373435
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
@@ -383,7 +445,8 @@ impl<R: oio::Page> oio::Page for MinitraceWrapper<R> {
383445

384446
impl<R: oio::BlockingPage> oio::BlockingPage for MinitraceWrapper<R> {
385447
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
386-
let _span = Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span);
448+
let _g = self.span.set_local_parent();
449+
let _span = LocalSpan::enter_with_local_parent(PageOperation::BlockingNext.into_static());
387450
self.inner.next()
388451
}
389452
}

0 commit comments

Comments
 (0)