diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index c7d7165e63c7..9f6516cb826f 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use futures::FutureExt; use minitrace::prelude::*; +use crate::raw::oio::AppendOperation; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; use crate::raw::oio::WriteOperation; @@ -134,7 +135,7 @@ impl LayeredAccessor for MinitraceAccessor { type BlockingReader = MinitraceWrapper; type Writer = MinitraceWrapper; type BlockingWriter = MinitraceWrapper; - type Appender = A::Appender; + type Appender = MinitraceWrapper; type Pager = MinitraceWrapper; type BlockingPager = MinitraceWrapper; @@ -142,7 +143,7 @@ impl LayeredAccessor for MinitraceAccessor { &self.inner } - #[trace("metadata")] + #[trace] fn metadata(&self) -> AccessorInfo { self.inner.info() } @@ -152,24 +153,49 @@ impl LayeredAccessor for MinitraceAccessor { self.inner.create_dir(path, args).await } + #[trace(enter_on_poll = true)] async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let span = Span::enter_with_local_parent("read"); self.inner .read(path, args) - .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r)))) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r), + ) + }) + }) .await } + #[trace(enter_on_poll = true)] async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let span = Span::enter_with_local_parent("write"); self.inner .write(path, args) - .map(|v| v.map(|(rp, r)| (rp, MinitraceWrapper::new(span, r)))) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r), + ) + }) + }) .await } + #[trace(enter_on_poll = true)] async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { - self.inner.append(path, args).await + self.inner + .append(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + MinitraceWrapper::new(Span::enter_with_local_parent("AppendOperation"), r), + ) + }) + }) + .await } #[trace(enter_on_poll = true)] @@ -192,11 +218,18 @@ impl LayeredAccessor for MinitraceAccessor { self.inner.delete(path, args).await } + #[trace(enter_on_poll = true)] async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let span = Span::enter_with_local_parent("list"); self.inner .list(path, args) - .map(|v| v.map(|(rp, s)| (rp, MinitraceWrapper::new(span, s)))) + .map(|v| { + v.map(|(rp, s)| { + ( + rp, + MinitraceWrapper::new(Span::enter_with_local_parent("ListOperation"), s), + ) + }) + }) .await } @@ -215,22 +248,22 @@ impl LayeredAccessor for MinitraceAccessor { self.inner.blocking_create_dir(path, args) } + #[trace] fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let span = Span::enter_with_local_parent("blocking_read"); self.inner.blocking_read(path, args).map(|(rp, r)| { ( rp, - MinitraceWrapper::new(Span::enter_with_parent("ReadOperation", &span), r), + MinitraceWrapper::new(Span::enter_with_local_parent("ReadOperation"), r), ) }) } + #[trace] fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let span = Span::enter_with_local_parent("blocking_write"); self.inner.blocking_write(path, args).map(|(rp, r)| { ( rp, - MinitraceWrapper::new(Span::enter_with_parent("WriteOperation", &span), r), + MinitraceWrapper::new(Span::enter_with_local_parent("WriteOperation"), r), ) }) } @@ -255,12 +288,12 @@ impl LayeredAccessor for MinitraceAccessor { self.inner.blocking_delete(path, args) } + #[trace] fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { - let span = Span::enter_with_local_parent("blocking_list"); self.inner.blocking_list(path, args).map(|(rp, it)| { ( rp, - MinitraceWrapper::new(Span::enter_with_parent("PageOperation", &span), it), + MinitraceWrapper::new(Span::enter_with_local_parent("PageOperation"), it), ) }) } @@ -279,34 +312,40 @@ impl MinitraceWrapper { impl oio::Read for MinitraceWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let _span = Span::enter_with_parent(ReadOperation::Read.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::Read.into_static()); self.inner.poll_read(cx, buf) } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { - let _span = Span::enter_with_parent(ReadOperation::Seek.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::Seek.into_static()); self.inner.poll_seek(cx, pos) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let _span = Span::enter_with_parent(ReadOperation::Next.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::Next.into_static()); self.inner.poll_next(cx) } } impl oio::BlockingRead for MinitraceWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { - let _span = Span::enter_with_parent(ReadOperation::BlockingRead.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static()); self.inner.read(buf) } fn seek(&mut self, pos: io::SeekFrom) -> Result { - let _span = Span::enter_with_parent(ReadOperation::BlockingSeek.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static()); self.inner.seek(pos) } fn next(&mut self) -> Option> { - let _span = Span::enter_with_parent(ReadOperation::BlockingNext.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static()); self.inner.next() } } @@ -356,18 +395,41 @@ impl oio::Write for MinitraceWrapper { impl oio::BlockingWrite for MinitraceWrapper { fn write(&mut self, bs: Bytes) -> Result<()> { - let _span = - Span::enter_with_parent(WriteOperation::BlockingWrite.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static()); self.inner.write(bs) } fn close(&mut self) -> Result<()> { - let _span = - Span::enter_with_parent(WriteOperation::BlockingClose.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingClose.into_static()); self.inner.close() } } +#[async_trait] +impl oio::Append for MinitraceWrapper { + async fn append(&mut self, bs: Bytes) -> Result<()> { + self.inner + .append(bs) + .in_span(Span::enter_with_parent( + AppendOperation::Append.into_static(), + &self.span, + )) + .await + } + + async fn close(&mut self) -> Result<()> { + self.inner + .close() + .in_span(Span::enter_with_parent( + AppendOperation::Close.into_static(), + &self.span, + )) + .await + } +} + #[async_trait] impl oio::Page for MinitraceWrapper { async fn next(&mut self) -> Result>> { @@ -383,7 +445,8 @@ impl oio::Page for MinitraceWrapper { impl oio::BlockingPage for MinitraceWrapper { fn next(&mut self) -> Result>> { - let _span = Span::enter_with_parent(PageOperation::BlockingNext.into_static(), &self.span); + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(PageOperation::BlockingNext.into_static()); self.inner.next() } }