Skip to content

Commit 6f136bf

Browse files
authored
refactor: replace more GAT-based async trait with RPITIT (#12271)
Signed-off-by: Bugen Zhao <[email protected]>
1 parent 1e2a4e5 commit 6f136bf

File tree

9 files changed

+243
-337
lines changed

9 files changed

+243
-337
lines changed

src/batch/src/exchange_source.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::fmt::Debug;
1616
use std::future::Future;
1717

1818
use risingwave_common::array::DataChunk;
19+
use risingwave_common::error::Result;
1920

2021
use crate::execution::grpc_exchange::GrpcExchangeSource;
2122
use crate::execution::local_exchange::LocalExchangeSource;
@@ -24,11 +25,7 @@ use crate::task::TaskId;
2425

2526
/// Each `ExchangeSource` maps to one task, it takes the execution result from task chunk by chunk.
2627
pub trait ExchangeSource: Send + Debug {
27-
type TakeDataFuture<'a>: Future<Output = risingwave_common::error::Result<Option<DataChunk>>>
28-
+ 'a
29-
where
30-
Self: 'a;
31-
fn take_data(&mut self) -> Self::TakeDataFuture<'_>;
28+
fn take_data(&mut self) -> impl Future<Output = Result<Option<DataChunk>>> + '_;
3229

3330
/// Get upstream task id.
3431
fn get_task_id(&self) -> TaskId;
@@ -42,9 +39,7 @@ pub enum ExchangeSourceImpl {
4239
}
4340

4441
impl ExchangeSourceImpl {
45-
pub(crate) async fn take_data(
46-
&mut self,
47-
) -> risingwave_common::error::Result<Option<DataChunk>> {
42+
pub(crate) async fn take_data(&mut self) -> Result<Option<DataChunk>> {
4843
match self {
4944
ExchangeSourceImpl::Grpc(grpc) => grpc.take_data().await,
5045
ExchangeSourceImpl::Local(local) => local.take_data().await,

src/batch/src/execution/grpc_exchange.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::fmt::{Debug, Formatter};
16-
use std::future::Future;
1716

1817
use futures::StreamExt;
1918
use risingwave_common::array::DataChunk;
@@ -73,26 +72,22 @@ impl Debug for GrpcExchangeSource {
7372
}
7473

7574
impl ExchangeSource for GrpcExchangeSource {
76-
type TakeDataFuture<'a> = impl Future<Output = Result<Option<DataChunk>>> + 'a;
77-
78-
fn take_data(&mut self) -> Self::TakeDataFuture<'_> {
79-
async {
80-
let res = match self.stream.next().await {
81-
None => {
82-
return Ok(None);
83-
}
84-
Some(r) => r,
85-
};
86-
let task_data = res?;
87-
let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact();
88-
trace!(
89-
"Receiver taskOutput = {:?}, data = {:?}",
90-
self.task_output_id,
91-
data
92-
);
75+
async fn take_data(&mut self) -> Result<Option<DataChunk>> {
76+
let res = match self.stream.next().await {
77+
None => {
78+
return Ok(None);
79+
}
80+
Some(r) => r,
81+
};
82+
let task_data = res?;
83+
let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact();
84+
trace!(
85+
"Receiver taskOutput = {:?}, data = {:?}",
86+
self.task_output_id,
87+
data
88+
);
9389

94-
Ok(Some(data))
95-
}
90+
Ok(Some(data))
9691
}
9792

9893
fn get_task_id(&self) -> TaskId {

src/batch/src/execution/local_exchange.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::fmt::{Debug, Formatter};
16-
use std::future::Future;
1716

1817
use risingwave_common::array::DataChunk;
1918
use risingwave_common::error::Result;
@@ -52,23 +51,19 @@ impl Debug for LocalExchangeSource {
5251
}
5352

5453
impl ExchangeSource for LocalExchangeSource {
55-
type TakeDataFuture<'a> = impl Future<Output = Result<Option<DataChunk>>> + 'a;
56-
57-
fn take_data(&mut self) -> Self::TakeDataFuture<'_> {
58-
async {
59-
let ret = self.task_output.direct_take_data().await?;
60-
if let Some(data) = ret {
61-
let data = data.compact();
62-
trace!(
63-
"Receiver task: {:?}, source task output: {:?}, data: {:?}",
64-
self.task_id,
65-
self.task_output.id(),
66-
data
67-
);
68-
Ok(Some(data))
69-
} else {
70-
Ok(None)
71-
}
54+
async fn take_data(&mut self) -> Result<Option<DataChunk>> {
55+
let ret = self.task_output.direct_take_data().await?;
56+
if let Some(data) = ret {
57+
let data = data.compact();
58+
trace!(
59+
"Receiver task: {:?}, source task output: {:?}, data: {:?}",
60+
self.task_id,
61+
self.task_output.id(),
62+
data
63+
);
64+
Ok(Some(data))
65+
} else {
66+
Ok(None)
7267
}
7368
}
7469

src/batch/src/executor/test_utils.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::collections::VecDeque;
16-
use std::future::Future;
1716

1817
use assert_matches::assert_matches;
1918
use futures::StreamExt;
@@ -246,15 +245,11 @@ impl FakeExchangeSource {
246245
}
247246

248247
impl ExchangeSource for FakeExchangeSource {
249-
type TakeDataFuture<'a> = impl Future<Output = Result<Option<DataChunk>>> + 'a;
250-
251-
fn take_data(&mut self) -> Self::TakeDataFuture<'_> {
252-
async {
253-
if let Some(chunk) = self.chunks.pop() {
254-
Ok(chunk)
255-
} else {
256-
Ok(None)
257-
}
248+
async fn take_data(&mut self) -> Result<Option<DataChunk>> {
249+
if let Some(chunk) = self.chunks.pop() {
250+
Ok(chunk)
251+
} else {
252+
Ok(None)
258253
}
259254
}
260255

src/batch/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#![feature(result_option_inspect)]
3535
#![feature(assert_matches)]
3636
#![feature(lazy_cell)]
37+
#![feature(return_position_impl_trait_in_trait)]
3738

3839
mod error;
3940
pub mod exchange_source;

src/connector/src/source/external.rs

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,9 @@ impl MySqlOffset {
200200
}
201201

202202
pub trait ExternalTableReader {
203-
type CdcOffsetFuture<'a>: Future<Output = ConnectorResult<CdcOffset>> + Send + 'a
204-
where
205-
Self: 'a;
206-
207203
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String;
208204

209-
fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_>;
205+
fn current_cdc_offset(&self) -> impl Future<Output = ConnectorResult<CdcOffset>> + Send + '_;
210206

211207
fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult<CdcOffset>;
212208

@@ -248,32 +244,28 @@ pub struct ExternalTableConfig {
248244
}
249245

250246
impl ExternalTableReader for MySqlExternalTableReader {
251-
type CdcOffsetFuture<'a> = impl Future<Output = ConnectorResult<CdcOffset>> + 'a;
252-
253247
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String {
254248
format!("`{}`", table_name.table_name)
255249
}
256250

257-
fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> {
258-
async move {
259-
let mut conn = self
260-
.pool
261-
.get_conn()
262-
.await
263-
.map_err(|e| ConnectorError::Connection(anyhow!(e)))?;
264-
265-
let sql = "SHOW MASTER STATUS".to_string();
266-
let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
267-
let row = rs
268-
.iter_mut()
269-
.exactly_one()
270-
.map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?;
271-
272-
Ok(CdcOffset::MySql(MySqlOffset {
273-
filename: row.take("File").unwrap(),
274-
position: row.take("Position").unwrap(),
275-
}))
276-
}
251+
async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
252+
let mut conn = self
253+
.pool
254+
.get_conn()
255+
.await
256+
.map_err(|e| ConnectorError::Connection(anyhow!(e)))?;
257+
258+
let sql = "SHOW MASTER STATUS".to_string();
259+
let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
260+
let row = rs
261+
.iter_mut()
262+
.exactly_one()
263+
.map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?;
264+
265+
Ok(CdcOffset::MySql(MySqlOffset {
266+
filename: row.take("File").unwrap(),
267+
position: row.take("Position").unwrap(),
268+
}))
277269
}
278270

279271
fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult<CdcOffset> {
@@ -478,21 +470,17 @@ impl MySqlExternalTableReader {
478470
}
479471

480472
impl ExternalTableReader for ExternalTableReaderImpl {
481-
type CdcOffsetFuture<'a> = impl Future<Output = ConnectorResult<CdcOffset>> + 'a;
482-
483473
fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String {
484474
match self {
485475
ExternalTableReaderImpl::MySql(mysql) => mysql.get_normalized_table_name(table_name),
486476
ExternalTableReaderImpl::Mock(mock) => mock.get_normalized_table_name(table_name),
487477
}
488478
}
489479

490-
fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> {
491-
async move {
492-
match self {
493-
ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
494-
ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
495-
}
480+
async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
481+
match self {
482+
ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
483+
ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
496484
}
497485
}
498486

src/connector/src/source/mock_external_table.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::future::Future;
1615
use std::sync::atomic::AtomicUsize;
1716

1817
use futures::stream::BoxStream;
@@ -91,24 +90,21 @@ impl MockExternalTableReader {
9190
}
9291

9392
impl ExternalTableReader for MockExternalTableReader {
94-
type CdcOffsetFuture<'a> = impl Future<Output = ConnectorResult<CdcOffset>> + 'a;
95-
9693
fn get_normalized_table_name(&self, _table_name: &SchemaTableName) -> String {
9794
"`mock_table`".to_string()
9895
}
9996

100-
fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> {
97+
async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
10198
static IDX: AtomicUsize = AtomicUsize::new(0);
102-
async move {
103-
let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
104-
if idx < self.binlog_watermarks.len() {
105-
Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
106-
} else {
107-
Ok(CdcOffset::MySql(MySqlOffset {
108-
filename: "1.binlog".to_string(),
109-
position: u64::MAX,
110-
}))
111-
}
99+
100+
let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
101+
if idx < self.binlog_watermarks.len() {
102+
Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone()))
103+
} else {
104+
Ok(CdcOffset::MySql(MySqlOffset {
105+
filename: "1.binlog".to_string(),
106+
position: u64::MAX,
107+
}))
112108
}
113109
}
114110

0 commit comments

Comments
 (0)