Skip to content

Commit 3805915

Browse files
authored
fix: not bail! on kinesis's state if scan.startup.mode set to timestamp (#18038) (#18040)
Signed-off-by: tabVersion <[email protected]>
1 parent a26210d commit 3805915

File tree

1 file changed

+4
-35
lines changed
  • src/connector/src/source/kinesis/source

1 file changed

+4
-35
lines changed

src/connector/src/source/kinesis/source/reader.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ impl SplitReader for KinesisSplitReader {
9191
if !matches!(start_position, KinesisOffset::Timestamp(_))
9292
&& properties.timestamp_offset.is_some()
9393
{
94-
bail!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp");
94+
// cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp'
95+
tracing::warn!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp, starting shard {} from the beginning",
96+
split.id()
97+
);
9598
}
9699

97100
let stream_name = properties.common.stream_name.clone();
@@ -309,40 +312,6 @@ mod tests {
309312
use crate::source::kinesis::split::KinesisSplit;
310313
use crate::source::SourceContext;
311314

312-
#[tokio::test]
313-
async fn test_reject_redundant_seq_props() {
314-
let properties = KinesisProperties {
315-
common: KinesisCommon {
316-
assume_role_arn: None,
317-
credentials_access_key: None,
318-
credentials_secret_access_key: None,
319-
stream_name: "kinesis_debug".to_string(),
320-
stream_region: "cn-northwest-1".to_string(),
321-
endpoint: None,
322-
session_token: None,
323-
assume_role_external_id: None,
324-
},
325-
326-
scan_startup_mode: None,
327-
timestamp_offset: Some(123456789098765432),
328-
329-
unknown_fields: Default::default(),
330-
};
331-
let client = KinesisSplitReader::new(
332-
properties,
333-
vec![KinesisSplit {
334-
shard_id: "shardId-000000000001".to_string().into(),
335-
start_position: KinesisOffset::Earliest,
336-
end_position: KinesisOffset::None,
337-
}],
338-
Default::default(),
339-
SourceContext::dummy().into(),
340-
None,
341-
)
342-
.await;
343-
assert!(client.is_err());
344-
}
345-
346315
#[tokio::test]
347316
#[ignore]
348317
async fn test_single_thread_kinesis_reader() -> Result<()> {

0 commit comments

Comments
 (0)