Skip to content

Commit 613d5d5

Browse files
committed
feat: add rejected records stats to connection status endpoints (#16973)
1 parent c0c3742 commit 613d5d5

File tree

8 files changed

+31
-8
lines changed

8 files changed

+31
-8
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13360,6 +13360,9 @@ components:
1336013360
recordsCommitted:
1336113361
type: integer
1336213362
format: int64
13363+
recordsRejected:
13364+
type: integer
13365+
format: int64
1336313366
bytesEmitted:
1336413367
type: integer
1336513368
format: int64
@@ -13425,6 +13428,9 @@ components:
1342513428
bytesCommitted:
1342613429
type: integer
1342713430
format: int64
13431+
recordsRejected:
13432+
type: integer
13433+
format: int64
1342813434
configType:
1342913435
$ref: "#/components/schemas/JobConfigType"
1343013436
ConnectionUptimeHistoryRead:
@@ -13441,6 +13447,7 @@ components:
1344113447
- configType
1344213448
- recordsEmitted
1344313449
- recordsCommitted
13450+
- recordsRejected
1344413451
- bytesEmitted
1344513452
- bytesCommitted
1344613453
properties:
@@ -13470,6 +13477,9 @@ components:
1347013477
bytesCommitted:
1347113478
type: integer
1347213479
format: int64
13480+
recordsRejected:
13481+
type: integer
13482+
format: int64
1347313483
ConnectionSyncResultRead:
1347413484
type: object
1347513485
required:

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1599,7 +1599,8 @@ public List<JobSyncResultRead> getConnectionDataHistory(final ConnectionDataHist
15991599
.bytesEmitted(aggregatedStats.getBytesEmitted())
16001600
.bytesCommitted(aggregatedStats.getBytesCommitted())
16011601
.recordsEmitted(aggregatedStats.getRecordsEmitted())
1602-
.recordsCommitted(aggregatedStats.getRecordsCommitted());
1602+
.recordsCommitted(aggregatedStats.getRecordsCommitted())
1603+
.recordsRejected(aggregatedStats.getRecordsRejected());
16031604
result.add(jobResult);
16041605
});
16051606

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/JobHistoryHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,8 @@ public ConnectionSyncProgressRead getConnectionSyncProgress(final ConnectionIdRe
415415
item.recordsEmitted(streamStats.getRecordsEmitted())
416416
.recordsCommitted(streamStats.getRecordsCommitted())
417417
.bytesEmitted(streamStats.getBytesEmitted())
418-
.bytesCommitted(streamStats.getBytesCommitted());
418+
.bytesCommitted(streamStats.getBytesCommitted())
419+
.recordsRejected(streamStats.getRecordsRejected());
419420
}
420421

421422
return item;
@@ -431,6 +432,7 @@ public ConnectionSyncProgressRead getConnectionSyncProgress(final ConnectionIdRe
431432
.bytesCommitted(aggregatedStats == null ? null : aggregatedStats.getBytesCommitted())
432433
.recordsEmitted(aggregatedStats == null ? null : aggregatedStats.getRecordsEmitted())
433434
.recordsCommitted(aggregatedStats == null ? null : aggregatedStats.getRecordsCommitted())
435+
.recordsRejected(aggregatedStats == null ? null : aggregatedStats.getRecordsRejected())
434436
.configType(runningJobConfigType)
435437
.streams(finalStreamsWithStats);
436438
}

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/ConnectionsHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2625,6 +2625,7 @@ void testGetConnectionDataHistory() throws IOException {
26252625
final long jobTwoBytesEmmitted = 87654L;
26262626
final long jobTwoRecordsCommitted = 50L;
26272627
final long jobTwoRecordsEmittted = 60L;
2628+
final long jobTwoRecordsRejected = 10L;
26282629
try (final MockedStatic<StatsAggregationHelper> mockStatsAggregationHelper = Mockito.mockStatic(StatsAggregationHelper.class)) {
26292630
mockStatsAggregationHelper.when(() -> StatsAggregationHelper.getJobIdToJobWithAttemptsReadMap(Mockito.any(), Mockito.any()))
26302631
.thenReturn(Map.of(
@@ -2641,6 +2642,7 @@ jobTwoId, new JobWithAttemptsRead().job(
26412642
.bytesCommitted(jobTwoBytesCommitted)
26422643
.bytesEmitted(jobTwoBytesEmmitted)
26432644
.recordsCommitted(jobTwoRecordsCommitted)
2645+
.recordsRejected(jobTwoRecordsRejected)
26442646
.recordsEmitted(jobTwoRecordsEmittted)))));
26452647

26462648
final List<JobSyncResultRead> expected = List.of(
@@ -2660,6 +2662,7 @@ jobTwoId, new JobWithAttemptsRead().job(
26602662
.bytesEmitted(jobTwoBytesEmmitted)
26612663
.recordsCommitted(jobTwoRecordsCommitted)
26622664
.recordsEmitted(jobTwoRecordsEmittted)
2665+
.recordsRejected(jobTwoRecordsRejected)
26632666
.jobCreatedAt(jobTwoCreatedAt)
26642667
.jobUpdatedAt(jobTwoUpdatedAt));
26652668

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,12 +734,12 @@ void testGetConnectionSyncProgressWithRunningJob() throws IOException {
734734
new StreamDescriptor().name("stream3")));
735735

736736
jobRead.setStreamAggregatedStats(List.of(
737-
new StreamStats().streamName("stream2").recordsEmitted(50L).bytesEmitted(20L).recordsCommitted(45L).bytesCommitted(15L),
737+
new StreamStats().streamName("stream2").recordsEmitted(50L).bytesEmitted(20L).recordsCommitted(45L).bytesCommitted(15L).recordsRejected(2L),
738738
new StreamStats().streamName("stream1").streamNamespace("ns1").recordsEmitted(5L).bytesEmitted(2L).recordsCommitted(5L)
739739
.bytesCommitted(2L)));
740740

741741
final JobAggregatedStats jobAggregatedStats =
742-
new JobAggregatedStats().bytesCommitted(17L).recordsCommitted(50L).bytesEmitted(22L).recordsEmitted(55L);
742+
new JobAggregatedStats().bytesCommitted(17L).recordsCommitted(50L).bytesEmitted(22L).recordsEmitted(55L).recordsRejected(2L);
743743
jobRead.setAggregatedStats(jobAggregatedStats);
744744

745745
final JobWithAttemptsRead firstJobWithAttemptRead = new JobWithAttemptsRead()
@@ -755,6 +755,7 @@ void testGetConnectionSyncProgressWithRunningJob() throws IOException {
755755
.connectionId(connectionId)
756756
.bytesCommitted(jobAggregatedStats.getBytesCommitted())
757757
.recordsCommitted(jobAggregatedStats.getRecordsCommitted())
758+
.recordsRejected(jobAggregatedStats.getRecordsRejected())
758759
.bytesEmitted(jobAggregatedStats.getBytesEmitted())
759760
.recordsEmitted(jobAggregatedStats.getRecordsEmitted())
760761
.configType(JobConfigType.SYNC)
@@ -774,6 +775,7 @@ void testGetConnectionSyncProgressWithRunningJob() throws IOException {
774775
.bytesEmitted(20L)
775776
.recordsCommitted(45L)
776777
.bytesCommitted(15L)
778+
.recordsRejected(2L)
777779
.configType(JobConfigType.SYNC),
778780
new StreamSyncProgressReadItem()
779781
.streamName("stream3")

airbyte-server/src/main/kotlin/io/airbyte/server/handlers/StreamStatusesHandler.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ open class StreamStatusesHandler(
131131
.bytesCommitted(aggregatedStats.bytesCommitted)
132132
.recordsEmitted(aggregatedStats.recordsEmitted)
133133
.recordsCommitted(aggregatedStats.recordsCommitted)
134+
.recordsRejected(aggregatedStats.recordsRejected)
134135
result.add(jobResult)
135136
}
136137

airbyte-server/src/test/kotlin/io/airbyte/server/handlers/StreamStatusesHandlerTest.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ internal class StreamStatusesHandlerTest {
202202
val jobTwoBytesEmmitted = 87654L
203203
val jobTwoRecordsCommitted = 50L
204204
val jobTwoRecordsEmittted = 60L
205+
val jobTwoRecordsRejected = 10L
205206
Mockito.mockStatic(StatsAggregationHelper::class.java).use { mockStatsAggregationHelper ->
206207
mockStatsAggregationHelper
207208
.`when`<Any> {
@@ -228,13 +229,14 @@ internal class StreamStatusesHandlerTest {
228229
.bytesCommitted(jobTwoBytesCommitted)
229230
.bytesEmitted(jobTwoBytesEmmitted)
230231
.recordsCommitted(jobTwoRecordsCommitted)
231-
.recordsEmitted(jobTwoRecordsEmittted),
232+
.recordsEmitted(jobTwoRecordsEmittted)
233+
.recordsRejected(jobTwoRecordsRejected),
232234
),
233235
),
234236
),
235237
)
236238
val expected =
237-
List.of(
239+
listOf(
238240
JobSyncResultRead()
239241
.configType(JobConfigType.SYNC)
240242
.jobId(jobOneId)
@@ -245,7 +247,7 @@ internal class StreamStatusesHandlerTest {
245247
.jobCreatedAt(jobOneCreatedAt)
246248
.jobUpdatedAt(jobOneUpdatedAt)
247249
.streamStatuses(
248-
List.of<@Valid ConnectionSyncResultRead?>(
250+
listOf<@Valid ConnectionSyncResultRead?>(
249251
ConnectionSyncResultRead()
250252
.status(io.airbyte.api.model.generated.JobStatus.SUCCEEDED)
251253
.streamName("streamOne")
@@ -263,10 +265,11 @@ internal class StreamStatusesHandlerTest {
263265
.bytesEmitted(jobTwoBytesEmmitted)
264266
.recordsCommitted(jobTwoRecordsCommitted)
265267
.recordsEmitted(jobTwoRecordsEmittted)
268+
.recordsRejected(jobTwoRecordsRejected)
266269
.jobCreatedAt(jobTwoCreatedAt)
267270
.jobUpdatedAt(jobTwoUpdatedAt)
268271
.streamStatuses(
269-
List.of<@Valid ConnectionSyncResultRead?>(
272+
listOf<@Valid ConnectionSyncResultRead?>(
270273
ConnectionSyncResultRead()
271274
.status(io.airbyte.api.model.generated.JobStatus.SUCCEEDED)
272275
.streamName("streamThree")

airbyte-webapp/src/area/connection/components/HistoricalOverview/HistoricalOverview.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const generatePlaceholderHistory = (
5656
jobUpdatedAt: dayjs().unix(),
5757
recordsCommitted: connectionSyncProgress.recordsCommitted ?? 0,
5858
recordsEmitted: connectionSyncProgress.recordsEmitted ?? 0,
59+
recordsRejected: connectionSyncProgress.recordsRejected ?? 0,
5960
streamStatuses: connectionSyncProgress.streams.map((syncProgressItem) => {
6061
return {
6162
status: "running",

0 commit comments

Comments
 (0)