Skip to content

Commit 0b089f5

Browse files
tabVersionlmatz
authored and
lmatz
committed
fix: reject kinesis source if the start up mode is not sequence number but sequence number is provided (#12048)
1 parent c97cbdb commit 0b089f5

File tree

3 files changed

+55
-0
lines changed

3 files changed

+55
-0
lines changed

e2e_test/source/basic/kinesis.slt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# reject kinesis source if the start up mode is not sequence number but sequence number is provided
2+
statement error
3+
create source s ( a int, b varchar )
4+
with (
5+
connector = 'kinesis',
6+
stream = 'my_stream',
7+
scan.startup.sequence_number = '4950',
8+
aws.region = 'us-east-1',
9+
kinesis.credentials.access = 'my_access_key_id',
10+
kinesis.credentials.secret = 'my_secret_access_key',
11+
) format plain encode json;

src/connector/src/source/kinesis/enumerator/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl SplitEnumerator for KinesisSplitEnumerator {
7676
.into_iter()
7777
.map(|x| KinesisSplit {
7878
shard_id: x.shard_id().unwrap_or_default().to_string().into(),
79+
// handle start with position in reader part
7980
start_position: KinesisOffset::None,
8081
end_position: KinesisOffset::None,
8182
})

src/connector/src/source/kinesis/source/reader.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ impl SplitReader for KinesisSplitReader {
8989
start_position => start_position.to_owned(),
9090
};
9191

92+
if !matches!(start_position, KinesisOffset::SequenceNumber(_))
93+
&& properties.seq_offset.is_some()
94+
{
95+
return Err(
96+
anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number")
97+
);
98+
}
99+
92100
let stream_name = properties.common.stream_name.clone();
93101
let client = properties.common.build_client().await?;
94102

@@ -275,6 +283,41 @@ mod tests {
275283
use crate::common::KinesisCommon;
276284
use crate::source::kinesis::split::KinesisSplit;
277285

286+
#[tokio::test]
287+
async fn test_reject_redundant_seq_props() {
288+
let properties = KinesisProperties {
289+
common: KinesisCommon {
290+
assume_role_arn: None,
291+
credentials_access_key: None,
292+
credentials_secret_access_key: None,
293+
stream_name: "kinesis_debug".to_string(),
294+
stream_region: "cn-northwest-1".to_string(),
295+
endpoint: None,
296+
session_token: None,
297+
assume_role_external_id: None,
298+
},
299+
300+
scan_startup_mode: None,
301+
seq_offset: Some(
302+
// redundant seq number
303+
"49629139817504901062972448413535783695568426186596941842".to_string(),
304+
),
305+
};
306+
let client = KinesisSplitReader::new(
307+
properties,
308+
vec![SplitImpl::Kinesis(KinesisSplit {
309+
shard_id: "shardId-000000000001".to_string().into(),
310+
start_position: KinesisOffset::Earliest,
311+
end_position: KinesisOffset::None,
312+
})],
313+
Default::default(),
314+
Default::default(),
315+
None,
316+
)
317+
.await;
318+
assert!(client.is_err());
319+
}
320+
278321
#[tokio::test]
279322
#[ignore]
280323
async fn test_single_thread_kinesis_reader() -> Result<()> {

0 commit comments

Comments
 (0)