Skip to content

Commit b235f68

Browse files
refactor: ExchangeWriter should report error throught status (risingwavelabs#8478)
1 parent 2db01f9 commit b235f68

File tree

3 files changed

+21
-13
lines changed

3 files changed

+21
-13
lines changed

src/batch/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use anyhow::anyhow;
1818
use risingwave_common::array::ArrayError;
1919
use risingwave_common::error::{ErrorCode, RwError};
2020
use thiserror::Error;
21+
use tonic::Status;
2122

2223
use crate::error::BatchError::Internal;
2324

@@ -67,3 +68,9 @@ impl From<RwError> for BatchError {
6768
Internal(anyhow!(format!("{}", s)))
6869
}
6970
}
71+
72+
impl<'a> From<&'a BatchError> for Status {
73+
fn from(err: &'a BatchError) -> Self {
74+
Status::internal(err.to_string())
75+
}
76+
}

src/batch/src/rpc/service/exchange.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ use risingwave_common::error::{Result, ToRwResult};
1616
use risingwave_pb::task_service::GetDataResponse;
1717
use tonic::Status;
1818

19-
type ExchangeDataSender = tokio::sync::mpsc::Sender<std::result::Result<GetDataResponse, Status>>;
19+
pub type GetDataResponseResult = std::result::Result<GetDataResponse, Status>;
20+
21+
type ExchangeDataSender = tokio::sync::mpsc::Sender<GetDataResponseResult>;
2022

21-
#[async_trait::async_trait]
2223
pub trait ExchangeWriter: Send {
23-
async fn write(&mut self, resp: GetDataResponse) -> Result<()>;
24+
async fn write(&mut self, resp: GetDataResponseResult) -> Result<()>;
2425
}
2526

2627
pub struct GrpcExchangeWriter {
@@ -41,12 +42,11 @@ impl GrpcExchangeWriter {
4142
}
4243
}
4344

44-
#[async_trait::async_trait]
4545
impl ExchangeWriter for GrpcExchangeWriter {
46-
async fn write(&mut self, data: GetDataResponse) -> Result<()> {
46+
async fn write(&mut self, data: GetDataResponseResult) -> Result<()> {
4747
self.written_chunks += 1;
4848
self.sender
49-
.send(Ok(data))
49+
.send(data)
5050
.await
5151
.to_rw_result_with(|| "failed to write data to ExchangeWriter".into())
5252
}
@@ -62,7 +62,7 @@ mod tests {
6262
async fn test_exchange_writer() {
6363
let (tx, _rx) = tokio::sync::mpsc::channel(10);
6464
let mut writer = GrpcExchangeWriter::new(tx);
65-
writer.write(GetDataResponse::default()).await.unwrap();
65+
writer.write(Ok(GetDataResponse::default())).await.unwrap();
6666
assert_eq!(writer.written_chunks(), 1);
6767
}
6868

@@ -71,7 +71,7 @@ mod tests {
7171
let (tx, rx) = tokio::sync::mpsc::channel(10);
7272
drop(rx);
7373
let mut writer = GrpcExchangeWriter::new(tx);
74-
let res = writer.write(GetDataResponse::default()).await;
74+
let res = writer.write(Ok(GetDataResponse::default())).await;
7575
assert!(res.is_err());
7676
}
7777
}

src/batch/src/task/task_execution.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl TaskOutput {
190190
/// Return whether the data stream is finished.
191191
async fn take_data_inner(
192192
&mut self,
193-
writer: &mut dyn ExchangeWriter,
193+
writer: &mut impl ExchangeWriter,
194194
at_most_num: Option<usize>,
195195
) -> Result<bool> {
196196
let mut cnt: usize = 0;
@@ -212,15 +212,16 @@ impl TaskOutput {
212212
let resp = GetDataResponse {
213213
record_batch: Some(pb),
214214
};
215-
writer.write(resp).await?;
215+
writer.write(Ok(resp)).await?;
216216
}
217217
// Reached EOF
218218
Ok(None) => {
219219
break;
220220
}
221221
// Error happened
222222
Err(e) => {
223-
return Err(to_rw_error(e));
223+
writer.write(Err(tonic::Status::from(&*e))).await?;
224+
break;
224225
}
225226
}
226227
cnt += 1;
@@ -232,14 +233,14 @@ impl TaskOutput {
232233
/// Return whether the data stream is finished.
233234
pub async fn take_data_with_num(
234235
&mut self,
235-
writer: &mut dyn ExchangeWriter,
236+
writer: &mut impl ExchangeWriter,
236237
num: usize,
237238
) -> Result<bool> {
238239
self.take_data_inner(writer, Some(num)).await
239240
}
240241

241242
/// Take all data and write the data in serialized format to `ExchangeWriter`.
242-
pub async fn take_data(&mut self, writer: &mut dyn ExchangeWriter) -> Result<()> {
243+
pub async fn take_data(&mut self, writer: &mut impl ExchangeWriter) -> Result<()> {
243244
let finish = self.take_data_inner(writer, None).await?;
244245
assert!(finish);
245246
Ok(())

0 commit comments

Comments
 (0)