Skip to content

refactor(error): dedicated error type for dml crate #14768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{BoxedError, ErrorCode, RwError};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
Expand Down Expand Up @@ -101,6 +102,13 @@ pub enum BatchError {
BoxedError,
),

#[error(transparent)]
Dml(
#[from]
#[backtrace]
DmlError,
),

// Make the ref-counted type to be a variant for easier code structuring.
#[error(transparent)]
Shared(
Expand Down
3 changes: 2 additions & 1 deletion src/dml/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ ignored = ["workspace-hack"]
normal = ["workspace-hack"]

[dependencies]
anyhow = "1"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
itertools = "0.12"
Expand All @@ -24,6 +23,8 @@ risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
rw_futures_util = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] }
tracing = { version = "0.1" }

Expand Down
12 changes: 4 additions & 8 deletions src/dml/src/dml_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use anyhow::Context;
use parking_lot::RwLock;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, TableId, TableVersionId};
use risingwave_common::error::Result;
use risingwave_common::transaction::transaction_id::{TxnId, TxnIdGenerator};
use risingwave_common::util::worker_util::WorkerNodeId;

use crate::error::{DmlError, Result};
use crate::{TableDmlHandle, TableDmlHandleRef};

pub type DmlManagerRef = Arc<DmlManager>;
Expand Down Expand Up @@ -112,9 +110,7 @@ impl DmlManager {
"dml handler registers with same version but different schema"
)
})
.with_context(|| {
format!("fail to register reader for table with key `{table_id:?}`")
})?,
.expect("the first dml executor is gone"), // this should never happen

// A new version of the table is activated, overwrite the old reader.
Ordering::Greater => new_handle!(o),
Expand All @@ -139,7 +135,7 @@ impl DmlManager {
// A new version of the table is activated, but the DML request is still on
// the old version.
Ordering::Less => {
bail!("schema changed for table `{table_id:?}`, please retry later")
return Err(DmlError::SchemaChanged);
}

// Write the chunk of correct version to the table.
Expand All @@ -155,7 +151,7 @@ impl DmlManager {
None => None,
}
}
.with_context(|| format!("no reader for dml in table `{table_id:?}`"))?;
.ok_or(DmlError::NoReader)?;

Ok(table_dml_handle)
}
Expand Down
28 changes: 28 additions & 0 deletions src/dml/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// The error type for DML operations.
#[derive(thiserror::Error, Debug)]
pub enum DmlError {
#[error("table schema has changed, please try again later")]
SchemaChanged,

#[error("no available table reader in streaming executors")]
NoReader,

#[error("table reader closed")]
ReaderClosed,
}

pub type Result<T> = std::result::Result<T, DmlError>;
2 changes: 2 additions & 0 deletions src/dml/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#![feature(type_alias_impl_trait)]
#![feature(box_patterns)]
#![feature(stmt_expr_attributes)]
#![feature(error_generic_member_access)]

pub use table::*;

pub mod dml_manager;
pub mod error;
mod table;
mod txn_channel;
25 changes: 11 additions & 14 deletions src/dml/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@

use std::sync::Arc;

use anyhow::{anyhow, Context};
use futures_async_stream::try_stream;
use parking_lot::RwLock;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::{Result, RwError};
use risingwave_common::transaction::transaction_id::TxnId;
use risingwave_common::transaction::transaction_message::TxnMsg;
use risingwave_connector::source::StreamChunkWithState;
use tokio::sync::oneshot;

use crate::error::{DmlError, Result};
use crate::txn_channel::{txn_channel, Receiver, Sender};

pub type TableDmlHandleRef = Arc<TableDmlHandle>;
Expand Down Expand Up @@ -89,17 +88,15 @@ impl TableDmlHandle {
loop {
let guard = self.core.read();
if guard.changes_txs.is_empty() {
return Err(RwError::from(anyhow!(
"no available table reader in streaming source executors"
)));
return Err(DmlError::NoReader);
}
let len = guard.changes_txs.len();
// Use session id instead of txn_id to choose channel so that we can preserve transaction order in the same session.
// PS: only hold if there's no scaling on the table.
let sender = guard
.changes_txs
.get((session_id % len as u32) as usize)
.context("no available table reader in streaming source executors")?
.unwrap()
.clone();

drop(guard);
Expand Down Expand Up @@ -183,7 +180,8 @@ impl WriteHandle {
pub async fn write_chunk(&self, chunk: StreamChunk) -> Result<()> {
assert_eq!(self.txn_state, TxnState::Begin);
// Ignore the notifier.
self.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
let _notifier = self
.write_txn_data_msg(TxnMsg::Data(self.txn_id, chunk))
.await?;
Ok(())
}
Expand All @@ -192,9 +190,8 @@ impl WriteHandle {
assert_eq!(self.txn_state, TxnState::Begin);
self.txn_state = TxnState::Committed;
// Await the notifier.
self.write_txn_control_msg(TxnMsg::End(self.txn_id))?
.await
.context("failed to wait the end message")?;
let notifier = self.write_txn_control_msg(TxnMsg::End(self.txn_id))?;
notifier.await.map_err(|_| DmlError::ReaderClosed)?;
Ok(())
}

Expand All @@ -221,7 +218,7 @@ impl WriteHandle {

// It's possible that the source executor is scaled in or migrated, so the channel
// is closed. To guarantee the transactional atomicity, bail out.
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
Err(_) => Err(DmlError::ReaderClosed),
}
}

Expand All @@ -235,7 +232,7 @@ impl WriteHandle {

// It's possible that the source executor is scaled in or migrated, so the channel
// is closed. To guarantee the transactional atomicity, bail out.
Err(_) => Err(RwError::from("write txn_msg channel closed".to_string())),
Err(_) => Err(DmlError::ReaderClosed),
}
}
}
Expand All @@ -251,7 +248,7 @@ pub struct TableStreamReader {
}

impl TableStreamReader {
#[try_stream(boxed, ok = StreamChunkWithState, error = RwError)]
#[try_stream(boxed, ok = StreamChunkWithState, error = DmlError)]
pub async fn into_data_stream_for_test(mut self) {
while let Some((txn_msg, notifier)) = self.rx.recv().await {
// Notify about that we've taken the chunk.
Expand All @@ -267,7 +264,7 @@ impl TableStreamReader {
}
}

#[try_stream(boxed, ok = TxnMsg, error = RwError)]
#[try_stream(boxed, ok = TxnMsg, error = DmlError)]
pub async fn into_stream(mut self) {
while let Some((txn_msg, notifier)) = self.rx.recv().await {
// Notify about that we've taken the chunk.
Expand Down
19 changes: 12 additions & 7 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::BTreeMap;
use std::mem;

use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId};
Expand Down Expand Up @@ -106,16 +106,21 @@ impl DmlExecutor {
// Note(bugen): Only register after the first barrier message is received, which means the
// current executor is activated. This avoids the new reader overwriting the old one during
// the preparation of schema change.
let batch_reader = self
.dml_manager
.register_reader(self.table_id, self.table_version_id, &self.column_descs)
.map_err(StreamExecutorError::connector_error)?;
let batch_reader = batch_reader.stream_reader().into_stream();
let handle = self.dml_manager.register_reader(
self.table_id,
self.table_version_id,
&self.column_descs,
)?;
let reader = handle
.stream_reader()
.into_stream()
.map_err(StreamExecutorError::from)
.boxed();

// Merge the two streams using `StreamReaderWithPause` because when we receive a pause
// barrier, we should stop receiving the data from DML. We poll data from the two streams in
// a round robin way.
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, batch_reader);
let mut stream = StreamReaderWithPause::<false, TxnMsg>::new(upstream, reader);

// If the first barrier requires us to pause on startup, pause the stream.
if barrier.is_pause_on_startup() {
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::{BoxedError, NotImplemented};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
use risingwave_dml::error::DmlError;
use risingwave_expr::ExprError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -87,11 +88,11 @@ pub enum ErrorKind {
BoxedError,
),

#[error("Dml error: {0}")]
#[error(transparent)]
DmlError(
#[source]
#[from]
#[backtrace]
BoxedError,
DmlError,
),

#[error(transparent)]
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::ops::Bound;
use std::sync::Arc;

use either::Either;
use futures::pin_mut;
use futures::stream::{self, StreamExt};
use futures::{pin_mut, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnId, Schema, TableId};
use risingwave_common::hash::VnodeBitmapExt;
Expand Down Expand Up @@ -147,7 +147,8 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
*splits_on_fetch += batch.len();
let batch_reader =
Self::build_batched_stream_reader(column_ids, source_ctx, source_desc, Some(batch))
.await?;
.await?
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(batch_reader);
}

Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use anyhow::anyhow;
use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::Schema;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
Expand Down Expand Up @@ -196,7 +196,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
// Replace the source reader with a new one of the new state.
let reader = self
.build_stream_source_reader(source_desc, Some(target_state.clone()))
.await?;
.await?
.map_err(StreamExecutorError::connector_error);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
Expand Down Expand Up @@ -337,7 +338,8 @@ impl<S: StateStore> FsSourceExecutor<S> {
let source_chunk_reader = self
.build_stream_source_reader(&source_desc, recover_state)
.instrument_await("fs_source_start_reader")
.await?;
.await?
.map_err(StreamExecutorError::connector_error);

// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
Expand Down
16 changes: 7 additions & 9 deletions src/stream/src/executor/source/list_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ use std::sync::Arc;

use anyhow::anyhow;
use either::Either;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::Op;
use risingwave_common::catalog::Schema;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_connector::source::filesystem::FsPageItem;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{BoxTryStream, SourceCtrlOpts};
use risingwave_connector::source::SourceCtrlOpts;
use risingwave_connector::ConnectorParams;
use risingwave_storage::StateStore;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -89,13 +88,12 @@ impl<S: StateStore> FsListExecutor<S> {
fn build_chunked_paginate_stream(
&self,
source_desc: &SourceDesc,
) -> StreamExecutorResult<BoxTryStream<StreamChunk>> {
let stream: std::pin::Pin<
Box<dyn Stream<Item = Result<FsPageItem, risingwave_common::error::RwError>> + Send>,
> = source_desc
) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<StreamChunk>>> {
let stream = source_desc
.source
.get_source_list()
.map_err(StreamExecutorError::connector_error)?;
.map_err(StreamExecutorError::connector_error)?
.map_err(StreamExecutorError::connector_error);

// Group FsPageItem stream into chunks of size 1024.
let chunked_stream = stream.chunks(CHUNK_SIZE).map(|chunk| {
Expand All @@ -120,7 +118,7 @@ impl<S: StateStore> FsListExecutor<S> {
))
});

Ok(chunked_stream.boxed())
Ok(chunked_stream)
}

#[try_stream(ok = Message, error = StreamExecutorError)]
Expand Down
Loading