Skip to content

Commit db18306

Browse files
authored
fix: not bail! on kinesis if starts with timestamp (#18038) (#18604)
Signed-off-by: tabVersion <[email protected]>
1 parent 798d80f commit db18306

File tree

1 file changed

+4
-35
lines changed
  • src/connector/src/source/kinesis/source

1 file changed

+4
-35
lines changed

src/connector/src/source/kinesis/source/reader.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ impl SplitReader for KinesisSplitReader {
9191
if !matches!(start_position, KinesisOffset::Timestamp(_))
9292
&& properties.timestamp_offset.is_some()
9393
{
94-
bail!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp");
94+
// cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp'
95+
tracing::warn!("scan.startup.mode need to be set to 'timestamp' if you want to start with a specific timestamp, starting shard {} from the beginning",
96+
split.id()
97+
);
9598
}
9699

97100
let stream_name = properties.common.stream_name.clone();
@@ -337,40 +340,6 @@ mod tests {
337340
use crate::connector_common::KinesisCommon;
338341
use crate::source::SourceContext;
339342

340-
#[tokio::test]
341-
async fn test_reject_redundant_seq_props() {
342-
let properties = KinesisProperties {
343-
common: KinesisCommon {
344-
assume_role_arn: None,
345-
credentials_access_key: None,
346-
credentials_secret_access_key: None,
347-
stream_name: "kinesis_debug".to_string(),
348-
stream_region: "cn-northwest-1".to_string(),
349-
endpoint: None,
350-
session_token: None,
351-
assume_role_external_id: None,
352-
},
353-
354-
scan_startup_mode: None,
355-
timestamp_offset: Some(123456789098765432),
356-
357-
unknown_fields: Default::default(),
358-
};
359-
let client = KinesisSplitReader::new(
360-
properties,
361-
vec![KinesisSplit {
362-
shard_id: "shardId-000000000001".to_string().into(),
363-
start_position: KinesisOffset::Earliest,
364-
end_position: KinesisOffset::None,
365-
}],
366-
Default::default(),
367-
SourceContext::dummy().into(),
368-
None,
369-
)
370-
.await;
371-
assert!(client.is_err());
372-
}
373-
374343
#[tokio::test]
375344
#[ignore]
376345
async fn test_single_thread_kinesis_reader() -> Result<()> {

0 commit comments

Comments
 (0)