Skip to content

Commit 20699d4

Browse files
authored
feat(subscription): support blocking cursor (#18675)
1 parent 00965ba commit 20699d4

File tree

15 files changed

+275
-137
lines changed

15 files changed

+275
-137
lines changed

e2e_test/subscription/main.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import subprocess
22
import psycopg2
3+
import threading
34
import time
45

56

@@ -274,7 +275,7 @@ def test_cursor_with_table_alter():
274275
row = execute_query("fetch next from cur",conn)
275276
check_rows_data([1,2],row[0],"Insert")
276277
row = execute_query("fetch next from cur",conn)
277-
assert(row == [])
278+
assert row == []
278279
row = execute_query("fetch next from cur",conn)
279280
check_rows_data([4,4,4],row[0],"Insert")
280281
execute_insert("insert into t1 values(5,5,5)",conn)
@@ -285,7 +286,7 @@ def test_cursor_with_table_alter():
285286
execute_insert("insert into t1 values(6,6)",conn)
286287
execute_insert("flush",conn)
287288
row = execute_query("fetch next from cur",conn)
288-
assert(row == [])
289+
assert row == []
289290
row = execute_query("fetch next from cur",conn)
290291
check_rows_data([6,6],row[0],"Insert")
291292
drop_table_subscription()
@@ -355,6 +356,52 @@ def test_rebuild_table():
355356
check_rows_data([1,100],row[2],"UpdateInsert")
356357
drop_table_subscription()
357358

359+
def test_block_cursor():
360+
print(f"test_block_cursor")
361+
create_table_subscription()
362+
conn = psycopg2.connect(
363+
host="localhost",
364+
port="4566",
365+
user="root",
366+
database="dev"
367+
)
368+
369+
execute_insert("declare cur subscription cursor for sub2 full",conn)
370+
execute_insert("insert into t2 values(1,1)",conn)
371+
execute_insert("flush",conn)
372+
execute_insert("update t2 set v2 = 100 where v1 = 1",conn)
373+
execute_insert("flush",conn)
374+
start_time = time.time()
375+
row = execute_query("fetch 100 from cur with (timeout = '30s')",conn)
376+
assert (time.time() - start_time) < 3
377+
assert len(row) == 3
378+
check_rows_data([1,1],row[0],"Insert")
379+
check_rows_data([1,1],row[1],"UpdateDelete")
380+
check_rows_data([1,100],row[2],"UpdateInsert")
381+
382+
# Test block cursor fetches data successfully
383+
thread = threading.Thread(target=insert_into_table)
384+
thread.start()
385+
row = execute_query("fetch 100 from cur with (timeout = '5s')",conn)
386+
check_rows_data([10,10],row[0],"Insert")
387+
thread.join()
388+
389+
# Test block cursor timeout
390+
row = execute_query("fetch 100 from cur with (timeout = '5s')",conn)
391+
assert row == []
392+
393+
drop_table_subscription()
394+
395+
def insert_into_table():
396+
time.sleep(2)
397+
conn = psycopg2.connect(
398+
host="localhost",
399+
port="4566",
400+
user="root",
401+
database="dev"
402+
)
403+
execute_insert("insert into t2 values(10,10)",conn)
404+
358405
if __name__ == "__main__":
359406
test_cursor_snapshot()
360407
test_cursor_op()
@@ -366,3 +413,4 @@ def test_rebuild_table():
366413
test_cursor_with_table_alter()
367414
test_cursor_fetch_n()
368415
test_rebuild_table()
416+
test_block_cursor()

proto/hummock.proto

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,6 @@ service HummockManagerService {
835835
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
836836
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
837837
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
838-
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
839838
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
840839
rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse);
841840
}
@@ -907,13 +906,3 @@ message BranchedObject {
907906
// Compaction group id the SST belongs to.
908907
uint64 compaction_group_id = 3;
909908
}
910-
911-
message ListChangeLogEpochsRequest {
912-
uint32 table_id = 1;
913-
uint64 min_epoch = 2;
914-
uint32 max_count = 3;
915-
}
916-
917-
message ListChangeLogEpochsResponse {
918-
repeated uint64 epochs = 1;
919-
}

src/frontend/src/catalog/subscription_catalog.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use core::str::FromStr;
16-
1715
use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER};
18-
use risingwave_common::types::Interval;
1916
use risingwave_common::util::epoch::Epoch;
2017
use risingwave_pb::catalog::subscription::PbSubscriptionState;
2118
use risingwave_pb::catalog::PbSubscription;
22-
use thiserror_ext::AsReport;
2319

2420
use super::OwnedByUserCatalog;
2521
use crate::error::{ErrorCode, Result};
22+
use crate::handler::util::convert_interval_to_u64_seconds;
2623
use crate::WithOptions;
2724

2825
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@@ -86,15 +83,7 @@ impl SubscriptionCatalog {
8683
let retention_seconds_str = properties.get("retention").ok_or_else(|| {
8784
ErrorCode::InternalError("Subscription retention time not set.".to_string())
8885
})?;
89-
let retention_seconds = (Interval::from_str(retention_seconds_str)
90-
.map_err(|err| {
91-
ErrorCode::InternalError(format!(
92-
"Retention needs to be set in Interval format: {:?}",
93-
err.to_report_string()
94-
))
95-
})?
96-
.epoch_in_micros()
97-
/ 1000000) as u64;
86+
let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?;
9887
self.retention_seconds = retention_seconds;
9988
Ok(())
10089
}

src/frontend/src/handler/fetch_cursor.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ use risingwave_sqlparser::ast::{FetchCursorStatement, Statement};
2222

2323
use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
2424
use super::query::BoundResult;
25+
use super::util::convert_interval_to_u64_seconds;
2526
use super::RwPgResponse;
2627
use crate::binder::BoundStatement;
2728
use crate::error::Result;
2829
use crate::handler::HandlerArgs;
29-
use crate::{Binder, PgResponseStream};
30+
use crate::{Binder, PgResponseStream, WithOptions};
3031

3132
pub async fn handle_fetch_cursor_execute(
3233
handler_args: HandlerArgs,
@@ -61,10 +62,31 @@ pub async fn handle_fetch_cursor(
6162
let (_, cursor_name) =
6263
Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?;
6364

65+
let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?;
66+
67+
if with_options.len() > 1 {
68+
bail_not_implemented!("only `timeout` is supported in with options")
69+
}
70+
71+
let timeout_seconds = with_options
72+
.get("timeout")
73+
.map(convert_interval_to_u64_seconds)
74+
.transpose()?;
75+
76+
if with_options.len() == 1 && timeout_seconds.is_none() {
77+
bail_not_implemented!("only `timeout` is supported in with options")
78+
}
79+
6480
let cursor_manager = session.get_cursor_manager();
6581

6682
let (rows, pg_descs) = cursor_manager
67-
.get_rows_with_cursor(cursor_name, stmt.count, handler_args, formats)
83+
.get_rows_with_cursor(
84+
cursor_name,
85+
stmt.count,
86+
handler_args,
87+
formats,
88+
timeout_seconds,
89+
)
6890
.await?;
6991
Ok(build_fetch_cursor_response(rows, pg_descs))
7092
}

src/frontend/src/handler/util.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use core::str::FromStr;
1516
use std::pin::Pin;
1617
use std::sync::Arc;
1718
use std::task::{Context, Poll};
@@ -28,13 +29,16 @@ use pin_project_lite::pin_project;
2829
use risingwave_common::array::DataChunk;
2930
use risingwave_common::catalog::Field;
3031
use risingwave_common::row::Row as _;
31-
use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz};
32+
use risingwave_common::types::{
33+
write_date_time_tz, DataType, Interval, ScalarRefImpl, Timestamptz,
34+
};
3235
use risingwave_common::util::epoch::Epoch;
3336
use risingwave_common::util::iter_util::ZipEqFast;
3437
use risingwave_sqlparser::ast::{
3538
CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr,
3639
TableFactor, TableWithJoins,
3740
};
41+
use thiserror_ext::AsReport;
3842

3943
use crate::error::{ErrorCode, Result as RwResult};
4044
use crate::session::{current, SessionImpl};
@@ -238,6 +242,19 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 {
238242
Epoch::from(logstore_u64).as_unix_millis()
239243
}
240244

245+
pub fn convert_interval_to_u64_seconds(interval: &String) -> RwResult<u64> {
246+
let seconds = (Interval::from_str(interval)
247+
.map_err(|err| {
248+
ErrorCode::InternalError(format!(
249+
"Covert interval to u64 error, please check format, error: {:?}",
250+
err.to_report_string()
251+
))
252+
})?
253+
.epoch_in_micros()
254+
/ 1000000) as u64;
255+
Ok(seconds)
256+
}
257+
241258
#[cfg(test)]
242259
mod tests {
243260
use postgres_types::{ToSql, Type};

src/frontend/src/meta_client.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,6 @@ pub trait FrontendMetaClient: Send + Sync {
120120
rate_limit: Option<u32>,
121121
) -> Result<()>;
122122

123-
async fn list_change_log_epochs(
124-
&self,
125-
table_id: u32,
126-
min_epoch: u64,
127-
max_count: u32,
128-
) -> Result<Vec<u64>>;
129-
130123
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;
131124

132125
async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
@@ -298,17 +291,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
298291
.map(|_| ())
299292
}
300293

301-
async fn list_change_log_epochs(
302-
&self,
303-
table_id: u32,
304-
min_epoch: u64,
305-
max_count: u32,
306-
) -> Result<Vec<u64>> {
307-
self.0
308-
.list_change_log_epochs(table_id, min_epoch, max_count)
309-
.await
310-
}
311-
312294
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
313295
self.0.get_cluster_recovery_status().await
314296
}

0 commit comments

Comments
 (0)