Skip to content

Commit fa9e754

Browse files
authored
refactor(error): dedicated error type for dml crate (#14768)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 7b900e0 commit fa9e754

14 files changed

+110
-75
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/batch/src/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub use anyhow::anyhow;
2020
use risingwave_common::array::ArrayError;
2121
use risingwave_common::error::{BoxedError, ErrorCode, RwError};
2222
use risingwave_common::util::value_encoding::error::ValueEncodingError;
23+
use risingwave_dml::error::DmlError;
2324
use risingwave_expr::ExprError;
2425
use risingwave_pb::PbFieldNotFound;
2526
use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
@@ -101,7 +102,15 @@ pub enum BatchError {
101102
BoxedError,
102103
),
103104

105+
#[error(transparent)]
106+
Dml(
107+
#[from]
108+
#[backtrace]
109+
DmlError,
110+
),
111+
104112
// Make the ref-counted type to be a variant for easier code structuring.
113+
// TODO(error-handling): replace with `thiserror_ext::Arc`
105114
#[error(transparent)]
106115
Shared(
107116
#[from]

src/dml/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ ignored = ["workspace-hack"]
1414
normal = ["workspace-hack"]
1515

1616
[dependencies]
17-
anyhow = "1"
1817
futures = { version = "0.3", default-features = false, features = ["alloc"] }
1918
futures-async-stream = { workspace = true }
2019
itertools = "0.12"
@@ -24,6 +23,8 @@ risingwave_common = { workspace = true }
2423
risingwave_connector = { workspace = true }
2524
risingwave_pb = { workspace = true }
2625
rw_futures_util = { workspace = true }
26+
thiserror = "1"
27+
thiserror-ext = { workspace = true }
2728
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] }
2829
tracing = { version = "0.1" }
2930

src/dml/src/dml_manager.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@ use std::collections::hash_map::Entry;
1717
use std::collections::HashMap;
1818
use std::sync::{Arc, Weak};
1919

20-
use anyhow::Context;
2120
use parking_lot::RwLock;
22-
use risingwave_common::bail;
2321
use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId};
24-
use risingwave_common::error::Result;
2522
use risingwave_common::transaction::transaction_id::{TxnId, TxnIdGenerator};
2623
use risingwave_common::util::worker_util::WorkerNodeId;
2724

25+
use crate::error::{DmlError, Result};
2826
use crate::{TableDmlHandle, TableDmlHandleRef};
2927

3028
pub type DmlManagerRef = Arc<DmlManager>;
@@ -112,9 +110,7 @@ impl DmlManager {
112110
"dml handler registers with same version but different schema"
113111
)
114112
})
115-
.with_context(|| {
116-
format!("fail to register reader for table with key `{table_id:?}`")
117-
})?,
113+
.expect("the first dml executor is gone"), // this should never happen
118114

119115
// A new version of the table is activated, overwrite the old reader.
120116
Ordering::Greater => new_handle!(o),
@@ -139,7 +135,7 @@ impl DmlManager {
139135
// A new version of the table is activated, but the DML request is still on
140136
// the old version.
141137
Ordering::Less => {
142-
bail!("schema changed for table `{table_id:?}`, please retry later")
138+
return Err(DmlError::SchemaChanged);
143139
}
144140

145141
// Write the chunk of correct version to the table.
@@ -155,7 +151,7 @@ impl DmlManager {
155151
None => None,
156152
}
157153
}
158-
.with_context(|| format!("no reader for dml in table `{table_id:?}`"))?;
154+
.ok_or(DmlError::NoReader)?;
159155

160156
Ok(table_dml_handle)
161157
}

src/dml/src/error.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2024 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/// The error type for DML operations.
16+
#[derive(thiserror::Error, Debug)]
17+
pub enum DmlError {
18+
#[error("table schema has changed, please try again later")]
19+
SchemaChanged,
20+
21+
#[error("no available table reader in streaming executors")]
22+
NoReader,
23+
24+
#[error("table reader closed")]
25+
ReaderClosed,
26+
}
27+
28+
pub type Result<T> = std::result::Result<T, DmlError>;

src/dml/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#![feature(type_alias_impl_trait)]
2121
#![feature(box_patterns)]
2222
#![feature(stmt_expr_attributes)]
23+
#![feature(error_generic_member_access)]
2324

2425
pub use table::*;
2526

2627
pub mod dml_manager;
28+
pub mod error;
2729
mod table;
2830
mod txn_channel;

src/dml/src/table.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,16 @@
1414

1515
use std::sync::Arc;
1616

17-
use anyhow::{anyhow, Context};
1817
use futures_async_stream::try_stream;
1918
use parking_lot::RwLock;
2019
use risingwave_common::array::StreamChunk;
2120
use risingwave_common::catalog::ColumnDesc;
22-
use risingwave_common::error::{Result, RwError};
2321
use risingwave_common::transaction::transaction_id::TxnId;
2422
use risingwave_common::transaction::transaction_message::TxnMsg;
2523
use risingwave_connector::source::StreamChunkWithState;
2624
use tokio::sync::oneshot;
2725

26+
use crate::error::{DmlError, Result};
2827
use crate::txn_channel::{txn_channel, Receiver, Sender};
2928

3029
pub type TableDmlHandleRef = Arc<TableDmlHandle>;
@@ -89,17 +88,15 @@ impl TableDmlHandle {
8988
loop {
9089
let guard = self.core.read();
9190
if guard.changes_txs.is_empty() {
92-
return Err(RwError::from(anyhow!(
93-
"no available table reader in streaming source executors"
94-
)));
91+
return Err(DmlError::NoReader);
9592
}
9693
let len = guard.changes_txs.len();
9794
// Use session id instead of txn_id to choose channel so that we can preserve transaction order in the same session.
9895
// PS: only hold if there's no scaling on the table.
9996
let sender = guard
10097
.changes_txs
10198
.get((session_id % len as u32) as usize)
102-
.context("no available table reader in streaming source executors")?
99+
.unwrap()
103100
.clone();
104101

105102
drop(guard);
@@ -183,7 +180,8 @@ impl WriteHandle {
183180
pub async fn write_chunk(&self, chunk: StreamChunk) -> Result<()> {
184181
assert_eq!(self.txn_state, TxnState::Begin);
185182
// Ignore the notifier.
186-
self.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
183+
let _notifier = self
184+
.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
187185
.await?;
188186
Ok(())
189187
}
@@ -192,9 +190,8 @@ impl WriteHandle {
192190
assert_eq!(self.txn_state, TxnState::Begin);
193191
self.txn_state = TxnState::Committed;
194192
// Await the notifier.
195-
self.write_txn_control_msg(TxnMsg::End(self.txn_id))?
196-
.await
197-
.context("failed to wait the end message")?;
193+
let notifier = self.write_txn_control_msg(TxnMsg::End(self.txn_id))?;
194+
notifier.await.map_err(|_| DmlError::ReaderClosed)?;
198195
Ok(())
199196
}
200197

@@ -221,7 +218,7 @@ impl WriteHandle {
221218

222219
// It's possible that the source executor is scaled in or migrated, so the channel
223220
// is closed. To guarantee the transactional atomicity, bail out.
224-
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
221+
Err(_) => Err(DmlError::ReaderClosed),
225222
}
226223
}
227224

@@ -235,7 +232,7 @@ impl WriteHandle {
235232

236233
// It's possible that the source executor is scaled in or migrated, so the channel
237234
// is closed. To guarantee the transactional atomicity, bail out.
238-
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
235+
Err(_) => Err(DmlError::ReaderClosed),
239236
}
240237
}
241238
}
@@ -251,7 +248,7 @@ pub struct TableStreamReader {
251248
}
252249

253250
impl TableStreamReader {
254-
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
251+
#[try_stream(boxed, ok = StreamChunkWithState, error = DmlError)]
255252
pub async fn into_data_stream_for_test(mut self) {
256253
while let Some((txn_msg, notifier)) = self.rx.recv().await {
257254
// Notify about that we've taken the chunk.
@@ -267,7 +264,7 @@ impl TableStreamReader {
267264
}
268265
}
269266

270-
#[try_stream(boxed, ok = TxnMsg, error = RwError)]
267+
#[try_stream(boxed, ok = TxnMsg, error = DmlError)]
271268
pub async fn into_stream(mut self) {
272269
while let Some((txn_msg, notifier)) = self.rx.recv().await {
273270
// Notify about that we've taken the chunk.

src/stream/src/executor/dml.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::collections::BTreeMap;
1616
use std::mem;
1717

1818
use either::Either;
19-
use futures::StreamExt;
19+
use futures::{StreamExt, TryStreamExt};
2020
use futures_async_stream::try_stream;
2121
use risingwave_common::array::StreamChunk;
2222
use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId};
@@ -106,16 +106,21 @@ impl DmlExecutor {
106106
// Note(bugen): Only register after the first barrier message is received, which means the
107107
// current executor is activated. This avoids the new reader overwriting the old one during
108108
// the preparation of schema change.
109-
let batch_reader = self
110-
.dml_manager
111-
.register_reader(self.table_id, self.table_version_id, &self.column_descs)
112-
.map_err(StreamExecutorError::connector_error)?;
113-
let batch_reader = batch_reader.stream_reader().into_stream();
109+
let handle = self.dml_manager.register_reader(
110+
self.table_id,
111+
self.table_version_id,
112+
&self.column_descs,
113+
)?;
114+
let reader = handle
115+
.stream_reader()
116+
.into_stream()
117+
.map_err(StreamExecutorError::from)
118+
.boxed();
114119

115120
// Merge the two streams using `StreamReaderWithPause` because when we receive a pause
116121
// barrier, we should stop receiving the data from DML. We poll data from the two streams in
117122
// a round robin way.
118-
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, batch_reader);
123+
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, reader);
119124

120125
// If the first barrier requires us to pause on startup, pause the stream.
121126
if barrier.is_pause_on_startup() {

src/stream/src/executor/error.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use risingwave_common::error::{BoxedError, NotImplemented};
1717
use risingwave_common::util::value_encoding::error::ValueEncodingError;
1818
use risingwave_connector::error::ConnectorError;
1919
use risingwave_connector::sink::SinkError;
20+
use risingwave_dml::error::DmlError;
2021
use risingwave_expr::ExprError;
2122
use risingwave_pb::PbFieldNotFound;
2223
use risingwave_rpc_client::error::RpcError;
@@ -87,11 +88,11 @@ pub enum ErrorKind {
8788
BoxedError,
8889
),
8990

90-
#[error("Dml error: {0}")]
91+
#[error(transparent)]
9192
DmlError(
92-
#[source]
93+
#[from]
9394
#[backtrace]
94-
BoxedError,
95+
DmlError,
9596
),
9697

9798
#[error(transparent)]

src/stream/src/executor/source/fetch_executor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use std::ops::Bound;
1818
use std::sync::Arc;
1919

2020
use either::Either;
21-
use futures::pin_mut;
2221
use futures::stream::{self, StreamExt};
22+
use futures::{pin_mut, TryStreamExt};
2323
use futures_async_stream::try_stream;
2424
use risingwave_common::catalog::{ColumnId, Schema, TableId};
2525
use risingwave_common::hash::VnodeBitmapExt;
@@ -147,7 +147,8 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
147147
*splits_on_fetch += batch.len();
148148
let batch_reader =
149149
Self::build_batched_stream_reader(column_ids, source_ctx, source_desc, Some(batch))
150-
.await?;
150+
.await?
151+
.map_err(StreamExecutorError::connector_error);
151152
stream.replace_data_stream(batch_reader);
152153
}
153154

src/stream/src/executor/source/fs_source_executor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020

2121
use anyhow::anyhow;
2222
use either::Either;
23-
use futures::StreamExt;
23+
use futures::{StreamExt, TryStreamExt};
2424
use futures_async_stream::try_stream;
2525
use risingwave_common::catalog::Schema;
2626
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
@@ -196,7 +196,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
196196
// Replace the source reader with a new one of the new state.
197197
let reader = self
198198
.build_stream_source_reader(source_desc, Some(target_state.clone()))
199-
.await?;
199+
.await?
200+
.map_err(StreamExecutorError::connector_error);
200201
stream.replace_data_stream(reader);
201202

202203
self.stream_source_core.stream_source_splits = target_state
@@ -337,7 +338,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
337338
let source_chunk_reader = self
338339
.build_stream_source_reader(&source_desc, recover_state)
339340
.instrument_await("fs_source_start_reader")
340-
.await?;
341+
.await?
342+
.map_err(StreamExecutorError::connector_error);
341343

342344
// Merge the chunks from source and the barriers into a single stream. We prioritize
343345
// barriers over source data chunks here.

src/stream/src/executor/source/list_executor.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ use std::sync::Arc;
1717

1818
use anyhow::anyhow;
1919
use either::Either;
20-
use futures::StreamExt;
20+
use futures::{StreamExt, TryStreamExt};
2121
use futures_async_stream::try_stream;
2222
use risingwave_common::array::Op;
2323
use risingwave_common::catalog::Schema;
2424
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
25-
use risingwave_connector::source::filesystem::FsPageItem;
2625
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
27-
use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts};
26+
use risingwave_connector::source::SourceCtrlOpts;
2827
use risingwave_connector::ConnectorParams;
2928
use risingwave_storage::StateStore;
3029
use thiserror_ext::AsReport;
@@ -89,13 +88,12 @@ impl<S: StateStore> FsListExecutor<S> {
8988
fn build_chunked_paginate_stream(
9089
&self,
9190
source_desc: &SourceDesc,
92-
) -> StreamExecutorResult<BoxTryStream<StreamChunk>> {
93-
let stream: std::pin::Pin<
94-
Box<dyn Stream<Item = Result<FsPageItem, risingwave_common::error::RwError>> + Send>,
95-
> = source_desc
91+
) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<StreamChunk>>> {
92+
let stream = source_desc
9693
.source
9794
.get_source_list()
98-
.map_err(StreamExecutorError::connector_error)?;
95+
.map_err(StreamExecutorError::connector_error)?
96+
.map_err(StreamExecutorError::connector_error);
9997

10098
// Group FsPageItem stream into chunks of size 1024.
10199
let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| {
@@ -120,7 +118,7 @@ impl<S: StateStore> FsListExecutor<S> {
120118
))
121119
});
122120

123-
Ok(chunked_stream.boxed())
121+
Ok(chunked_stream)
124122
}
125123

126124
#[try_stream(ok = Message, error = StreamExecutorError)]

0 commit comments

Comments
 (0)