Skip to content

Commit 8b29c74

Browse files
committed
add test
1 parent 5da1cad commit 8b29c74

File tree

2 files changed

+40
-1
lines changed

2 files changed

+40
-1
lines changed

src/connector/src/source/kinesis/enumerator/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl SplitEnumerator for KinesisSplitEnumerator {
7676
.into_iter()
7777
.map(|x| KinesisSplit {
7878
shard_id: x.shard_id().unwrap_or_default().to_string().into(),
79+
// handle start with position in reader part
7980
start_position: KinesisOffset::None,
8081
end_position: KinesisOffset::None,
8182
})

src/connector/src/source/kinesis/source/reader.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ impl SplitReader for KinesisSplitReader {
8888
},
8989
start_position => start_position.to_owned(),
9090
};
91-
if start_position != KinesisOffset::SequenceNumber && properties.seq_offset.is_some() {
91+
92+
if !matches!(start_position, KinesisOffset::SequenceNumber(_))
93+
&& properties.seq_offset.is_some()
94+
{
9295
return Err(
9396
anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number")
9497
);
@@ -280,6 +283,41 @@ mod tests {
280283
use crate::common::KinesisCommon;
281284
use crate::source::kinesis::split::KinesisSplit;
282285

286+
#[tokio::test]
287+
async fn test_reject_redundant_seq_props() {
288+
let properties = KinesisProperties {
289+
common: KinesisCommon {
290+
assume_role_arn: None,
291+
credentials_access_key: None,
292+
credentials_secret_access_key: None,
293+
stream_name: "kinesis_debug".to_string(),
294+
stream_region: "cn-northwest-1".to_string(),
295+
endpoint: None,
296+
session_token: None,
297+
assume_role_external_id: None,
298+
},
299+
300+
scan_startup_mode: None,
301+
seq_offset: Some(
302+
// redundant seq number
303+
"49629139817504901062972448413535783695568426186596941842".to_string(),
304+
),
305+
};
306+
let client = KinesisSplitReader::new(
307+
properties,
308+
vec![SplitImpl::Kinesis(KinesisSplit {
309+
shard_id: "shardId-000000000001".to_string().into(),
310+
start_position: KinesisOffset::Earliest,
311+
end_position: KinesisOffset::None,
312+
})],
313+
Default::default(),
314+
Default::default(),
315+
None,
316+
)
317+
.await;
318+
assert!(client.is_err());
319+
}
320+
283321
#[tokio::test]
284322
#[ignore]
285323
async fn test_single_thread_kinesis_reader() -> Result<()> {

0 commit comments

Comments
 (0)