Skip to content

Commit 207d018

Browse files
committed
feat: connection manager workflow considers rejected records for progress (#16914)
1 parent 76e9006 commit 207d018

File tree

3 files changed

+48
-35
lines changed

3 files changed

+48
-35
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/helpers/ProgressCheckerPredicates.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,16 @@ public boolean test(@Nonnull final AttemptStats stats) {
2727
Objects.requireNonNull(stats, "Attempt stats null. Cannot test progress.");
2828
// If recordsCommitted is null, treat this as a 0
2929
final var recordsCommitted = Objects.requireNonNullElse(stats.getRecordsCommitted(), 0L);
30+
final var recordsRejected = Objects.requireNonNullElse(stats.getRecordsRejected(), 0L);
3031

31-
return recordsCommitted >= RECORDS_COMMITTED_THRESHOLD;
32+
// In the context of progress, we count both committed and rejected records since rejected records
33+
// are
34+
// records that were successfully sent to the destination and rejected by the destination due to the
35+
// records
36+
// not confirming to the destination requirements. They are being tracked as part of checkpointing
37+
// and we should
38+
// be continuing past those records to avoid getting stuck on those.
39+
return recordsCommitted + recordsRejected >= RECORDS_COMMITTED_THRESHOLD;
3240
}
3341

3442
}

airbyte-workers/src/test/java/io/airbyte/workers/helpers/ProgressCheckerPredicatesTest.java

Lines changed: 0 additions & 34 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.helpers
6+
7+
import io.airbyte.api.client.model.generated.AttemptStats
8+
import org.junit.jupiter.api.Assertions
9+
import org.junit.jupiter.params.ParameterizedTest
10+
import org.junit.jupiter.params.provider.Arguments
11+
import org.junit.jupiter.params.provider.MethodSource
12+
import java.util.stream.Stream
13+
14+
internal class ProgressCheckerPredicatesTest {
15+
val checker: ProgressCheckerPredicates = ProgressCheckerPredicates()
16+
17+
@ParameterizedTest
18+
@MethodSource("statsProgressMatrix")
19+
fun checkAttemptStats(
20+
stats: AttemptStats?,
21+
expected: Boolean,
22+
) {
23+
Assertions.assertEquals(expected, checker.test(stats))
24+
}
25+
26+
companion object {
27+
@JvmStatic
28+
fun statsProgressMatrix(): Stream<Arguments> =
29+
Stream.of(
30+
Arguments.of(AttemptStats(recordsCommitted = 0L), false),
31+
Arguments.of(AttemptStats(recordsCommitted = 1L), true),
32+
Arguments.of(AttemptStats(recordsCommitted = 3L), true),
33+
Arguments.of(AttemptStats(recordsCommitted = 3L, recordsRejected = 2L), true),
34+
Arguments.of(AttemptStats(recordsRejected = 5L), true),
35+
Arguments.of(AttemptStats(recordsCommitted = 9999L), true),
36+
Arguments.of(AttemptStats(recordsCommitted = null), false),
37+
)
38+
}
39+
}

0 commit comments

Comments
 (0)