Skip to content

Commit e367090

Browse files
committed
feat: add rejected record count to timeline event (#16923)
1 parent ff9a4fc commit e367090

File tree

7 files changed

+68
-43
lines changed

7 files changed

+68
-43
lines changed

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1507,6 +1507,7 @@ public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody c
15071507
jobRead.getAttempts().stream()
15081508
.mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
15091509
.sum(),
1510+
null,
15101511
job.getAttemptsCount(),
15111512
job.configType.name(),
15121513
job.status.name(),
@@ -1527,6 +1528,7 @@ public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody c
15271528
jobRead.getAttempts().stream()
15281529
.mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
15291530
.sum(),
1531+
null,
15301532
job.getAttemptsCount(),
15311533
job.configType.name(),
15321534
job.status.name(),

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelper.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,16 @@ public UserReadInConnectionEvent getUserReadInConnectionEvent(final UUID userId,
143143
}
144144
}
145145

146-
record LoadedStats(long bytes, long records) {}
146+
record TimelineJobStats(long loadedBytes, long loadedRecords, long rejectedRecords) {}
147147

148148
@VisibleForTesting
149-
LoadedStats buildLoadedStats(final Job job, final List<AttemptStats> attemptStats) {
149+
TimelineJobStats buildTimelineJobStats(final Job job, final List<AttemptStats> attemptStats) {
150150
final var configuredCatalog = new JobConfigProxy(job.config).getConfiguredCatalog();
151151
final List<ConfiguredAirbyteStream> streams = configuredCatalog != null ? configuredCatalog.getStreams() : List.of();
152152

153153
long bytesLoaded = 0;
154154
long recordsLoaded = 0;
155+
long recordsRejected = 0;
155156

156157
for (final var stream : streams) {
157158
final AirbyteStream currentStream = stream.getStream();
@@ -165,25 +166,27 @@ LoadedStats buildLoadedStats(final Job job, final List<AttemptStats> attemptStat
165166
final StreamStatsRecord records = StatsAggregationHelper.getAggregatedStats(stream.getSyncMode(), streamStats);
166167
recordsLoaded += records.recordsCommitted();
167168
bytesLoaded += records.bytesCommitted();
169+
recordsRejected += records.recordsRejected();
168170
}
169171
}
170-
return new LoadedStats(bytesLoaded, recordsLoaded);
172+
return new TimelineJobStats(bytesLoaded, recordsLoaded, recordsRejected);
171173
}
172174

173175
public void logJobSuccessEventInConnectionTimeline(final Job job, final UUID connectionId, final List<AttemptStats> attemptStats) {
174176
try {
175-
final LoadedStats stats = buildLoadedStats(job, attemptStats);
177+
final TimelineJobStats stats = buildTimelineJobStats(job, attemptStats);
176178
final FinalStatusEvent event = new FinalStatusEvent(
177179
job.id,
178180
job.createdAtInSecond,
179181
job.updatedAtInSecond,
180-
stats.bytes,
181-
stats.records,
182+
stats.loadedBytes,
183+
stats.loadedRecords,
184+
stats.rejectedRecords,
182185
job.getAttemptsCount(),
183186
job.configType.name(),
184187
JobStatus.SUCCEEDED.name(),
185188
JobConverter.getStreamsAssociatedWithJob(job),
186-
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)); // TODO only include if there are rejected records
189+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, stats.rejectedRecords));
187190
connectionTimelineEventService.writeEvent(connectionId, event, null);
188191
} catch (final Exception e) {
189192
LOGGER.error("Failed to persist timeline event for job: {}", job.id, e);
@@ -192,23 +195,24 @@ public void logJobSuccessEventInConnectionTimeline(final Job job, final UUID con
192195

193196
public void logJobFailureEventInConnectionTimeline(final Job job, final UUID connectionId, final List<AttemptStats> attemptStats) {
194197
try {
195-
final LoadedStats stats = buildLoadedStats(job, attemptStats);
198+
final TimelineJobStats stats = buildTimelineJobStats(job, attemptStats);
196199

197200
final Optional<AttemptFailureSummary> lastAttemptFailureSummary = job.getLastAttempt().flatMap(Attempt::getFailureSummary);
198201
final Optional<FailureReason> firstFailureReasonOfLastAttempt =
199202
lastAttemptFailureSummary.flatMap(summary -> summary.getFailures().stream().findFirst());
200-
final String jobEventFailureStatus = stats.bytes > 0 ? FinalStatus.INCOMPLETE.name() : FinalStatus.FAILED.name();
203+
final String jobEventFailureStatus = stats.loadedBytes > 0 ? FinalStatus.INCOMPLETE.name() : FinalStatus.FAILED.name();
201204
final FailedEvent event = new FailedEvent(
202205
job.id,
203206
job.createdAtInSecond,
204207
job.updatedAtInSecond,
205-
stats.bytes,
206-
stats.records,
208+
stats.loadedBytes,
209+
stats.loadedRecords,
210+
stats.rejectedRecords,
207211
job.getAttemptsCount(),
208212
job.configType.name(),
209213
jobEventFailureStatus,
210214
JobConverter.getStreamsAssociatedWithJob(job),
211-
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job), // TODO only include if there are rejected records
215+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, stats.rejectedRecords),
212216
firstFailureReasonOfLastAttempt);
213217
connectionTimelineEventService.writeEvent(connectionId, event, null);
214218
} catch (final Exception e) {
@@ -219,19 +223,20 @@ public void logJobFailureEventInConnectionTimeline(final Job job, final UUID con
219223
public void logJobCancellationEventInConnectionTimeline(final Job job,
220224
final List<AttemptStats> attemptStats) {
221225
try {
222-
final LoadedStats stats = buildLoadedStats(job, attemptStats);
226+
final TimelineJobStats stats = buildTimelineJobStats(job, attemptStats);
223227
final UUID connectionId = UUID.fromString(job.scope);
224228
final FinalStatusEvent event = new FinalStatusEvent(
225229
job.id,
226230
job.createdAtInSecond,
227231
job.updatedAtInSecond,
228-
stats.bytes,
229-
stats.records,
232+
stats.loadedBytes,
233+
stats.loadedRecords,
234+
stats.rejectedRecords,
230235
job.getAttemptsCount(),
231236
job.configType.name(),
232237
io.airbyte.config.JobStatus.CANCELLED.name(),
233238
JobConverter.getStreamsAssociatedWithJob(job),
234-
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)); // TODO only include if there are rejected records
239+
connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, stats.rejectedRecords));
235240
connectionTimelineEventService.writeEvent(connectionId, event, getCurrentUserIdIfExist());
236241
} catch (final Exception e) {
237242
LOGGER.error("Failed to persist job cancelled event for job: {}", job.id, e);

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ void setup() {
8080
}
8181

8282
@Test
83-
void testGetLoadedStats() {
83+
void testGetTimelineJobStats() {
8484

8585
connectionTimelineEventHelper = new ConnectionTimelineEventHelper(Set.of(),
8686
currentUserService, organizationPersistence, permissionHandler, userPersistence, connectorObjectStorageService,
@@ -107,25 +107,25 @@ void testGetLoadedStats() {
107107
new Job(100L, SYNC, CONNECTION_ID.toString(), jobConfig, List.of(), JobStatus.SUCCEEDED, 0L, 0L, 0L, true);
108108

109109
/*
110-
* on a per stream basis, the stats are "users" -> (100L, 1L), (500L, 8L), (200L, 7L) "purchase" ->
111-
* (1000L, 10L), (5000L, 80L), (2000L, 70L) "vendor" -> (10000L, 100L), (50000L, 800L), (20000L,
112-
* 700L)
110+
* on a per stream basis, the stats are "users" -> (100L, 1L, 0L), (500L, 8L, 0L), (200L, 7L, 0L)
111+
* "purchase" -> (1000L, 10L, 1L), (5000L, 80L, 8L), (2000L, 70L, 7L) "vendor" -> (10000L, 100L,
112+
* 10L), (50000L, 800L, 80L), (20000L, 700L, 70L)
113113
*/
114114

115115
final List<Map<String, SyncStats>> perAttemptStreamStats = List.of(
116116
Map.of(
117-
userStreamName, new SyncStats().withBytesCommitted(100L).withRecordsCommitted(1L),
118-
purchaseStreamName, new SyncStats().withBytesCommitted(1000L).withRecordsCommitted(10L),
119-
vendorStreamName, new SyncStats().withBytesCommitted(10000L).withRecordsCommitted(100L)),
117+
userStreamName, new SyncStats().withBytesCommitted(100L).withRecordsCommitted(1L).withRecordsRejected(0L),
118+
purchaseStreamName, new SyncStats().withBytesCommitted(1000L).withRecordsCommitted(10L).withRecordsRejected(1L),
119+
vendorStreamName, new SyncStats().withBytesCommitted(10000L).withRecordsCommitted(100L).withRecordsRejected(10L)),
120120

121121
Map.of(
122-
userStreamName, new SyncStats().withBytesCommitted(500L).withRecordsCommitted(8L),
123-
purchaseStreamName, new SyncStats().withBytesCommitted(5000L).withRecordsCommitted(80L),
124-
vendorStreamName, new SyncStats().withBytesCommitted(50000L).withRecordsCommitted(800L)),
122+
userStreamName, new SyncStats().withBytesCommitted(500L).withRecordsCommitted(8L).withRecordsRejected(0L),
123+
purchaseStreamName, new SyncStats().withBytesCommitted(5000L).withRecordsCommitted(80L).withRecordsRejected(8L),
124+
vendorStreamName, new SyncStats().withBytesCommitted(50000L).withRecordsCommitted(800L).withRecordsRejected(80L)),
125125
Map.of(
126-
userStreamName, new SyncStats().withBytesCommitted(200L).withRecordsCommitted(7L),
127-
purchaseStreamName, new SyncStats().withBytesCommitted(2000L).withRecordsCommitted(70L),
128-
vendorStreamName, new SyncStats().withBytesCommitted(20000L).withRecordsCommitted(700L)));
126+
userStreamName, new SyncStats().withBytesCommitted(200L).withRecordsCommitted(7L).withRecordsRejected(0L),
127+
purchaseStreamName, new SyncStats().withBytesCommitted(2000L).withRecordsCommitted(70L).withRecordsRejected(7L),
128+
vendorStreamName, new SyncStats().withBytesCommitted(20000L).withRecordsCommitted(700L).withRecordsRejected(70L)));
129129

130130
final List<JobPersistence.AttemptStats> attemptStatsList = perAttemptStreamStats
131131
.stream().map(dict -> new JobPersistence.AttemptStats(
@@ -140,9 +140,11 @@ purchaseStreamName, new SyncStats().withBytesCommitted(2000L).withRecordsCommitt
140140
// across syncs
141141
final long expectedBytesLoaded = 200L + (1000L + 5000L + 2000L) + (10000L + 50000L + 20000L);
142142
final long expectedRecordsLoaded = 7L + (10L + 80L + 70L) + (100L + 800L + 700L);
143-
final var result = connectionTimelineEventHelper.buildLoadedStats(job, attemptStatsList);
144-
assertEquals(expectedBytesLoaded, result.bytes());
145-
assertEquals(expectedRecordsLoaded, result.records());
143+
final long expectedRecordsRejected = (1L + 8L + 7L) + (10L + 80L + 70L);
144+
final var result = connectionTimelineEventHelper.buildTimelineJobStats(job, attemptStatsList);
145+
assertEquals(expectedBytesLoaded, result.loadedBytes());
146+
assertEquals(expectedRecordsLoaded, result.loadedRecords());
147+
assertEquals(expectedRecordsRejected, result.rejectedRecords());
146148
}
147149

148150
@Nested

airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/FailedEvent.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,25 @@ class FailedEvent(
1717
endTimeEpochSeconds: Long,
1818
bytesLoaded: Long,
1919
recordsLoaded: Long,
20+
recordsRejected: Long? = null,
2021
attemptsCount: Int,
2122
jobType: String,
2223
statusType: String,
2324
streams: List<StreamDescriptor>? = null,
24-
rejectedRecords: RejectedRecordsMetadata? = null,
25+
rejectedRecordsMeta: RejectedRecordsMetadata? = null,
2526
private val failureReason: Optional<FailureReason>,
2627
) : FinalStatusEvent(
2728
jobId,
2829
startTimeEpochSeconds,
2930
endTimeEpochSeconds,
3031
bytesLoaded,
3132
recordsLoaded,
33+
recordsRejected,
3234
attemptsCount,
3335
jobType,
3436
statusType,
3537
streams,
36-
rejectedRecords,
38+
rejectedRecordsMeta,
3739
) {
3840
fun getFailureReason(): Optional<FailureReason> = failureReason
3941
}

airbyte-data/src/main/kotlin/io/airbyte/data/services/shared/FinalStatusEvent.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ open class FinalStatusEvent(
1818
private val endTimeEpochSeconds: Long,
1919
private val bytesLoaded: Long,
2020
private val recordsLoaded: Long,
21+
private val recordsRejected: Long? = null,
2122
private val attemptsCount: Int,
2223
private val jobType: String,
2324
private val statusType: String,
2425
private val streams: List<StreamDescriptor>? = null,
25-
private val rejectedRecords: RejectedRecordsMetadata? = null,
26+
private val rejectedRecordsMeta: RejectedRecordsMetadata? = null,
2627
) : ConnectionEvent {
2728
fun getJobId(): Long = jobId
2829

@@ -36,7 +37,9 @@ open class FinalStatusEvent(
3637

3738
fun getAttemptsCount(): Int = attemptsCount
3839

39-
fun getRejectedRecords(): RejectedRecordsMetadata? = rejectedRecords
40+
fun getRecordsRejected(): Long? = recordsRejected
41+
42+
fun getRejectedRecordsMeta(): RejectedRecordsMetadata? = rejectedRecordsMeta
4043

4144
fun getStreams(): List<StreamDescriptor>? = streams
4245

airbyte-domain/services/src/main/kotlin/io/airbyte/domain/services/storage/ConnectorObjectStorageService.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ class ConnectorObjectStorageService(
3232
fun getRejectedRecordsForJob(
3333
connectionId: ConnectionId,
3434
job: Job,
35+
rejectedRecordCount: Long,
3536
): RejectedRecordsMetadata? {
37+
if (rejectedRecordCount <= 0) {
38+
return null
39+
}
40+
3641
val destinationVersion = getJobDestinationVersion(job)
3742
if (destinationVersion == null) {
3843
log.warn { "Job ${job.id} does not have a valid destination version, cannot retrieve rejected records metadata." }

airbyte-domain/services/src/test/kotlin/io/airbyte/domain/services/storage/ConnectorObjectStorageServiceTest.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class ConnectorObjectStorageServiceTest {
102102

103103
@Test
104104
fun `returns metadata for valid storage config`() {
105-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
105+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
106106

107107
assertNotNull(result)
108108
assertEquals("s3://my-bucket/my/path/$jobId", result!!.storageUri)
@@ -112,6 +112,12 @@ class ConnectorObjectStorageServiceTest {
112112
)
113113
}
114114

115+
@Test
116+
fun `returns null when rejected record count is zero`() {
117+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 0L)
118+
assertNull(result)
119+
}
120+
115121
@Test
116122
fun `returns null when job is not a sync job`() {
117123
val job =
@@ -122,15 +128,15 @@ class ConnectorObjectStorageServiceTest {
122128
},
123129
)
124130

125-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
131+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
126132

127133
assertNull(result)
128134
}
129135

130136
@Test
131137
fun `returns null when destination version is not found`() {
132138
every { actorDefinitionService.getActorDefinitionVersion(destinationVersionId) } throws ConfigNotFoundException("", "")
133-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
139+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
134140
assertNull(result)
135141
}
136142

@@ -139,7 +145,7 @@ class ConnectorObjectStorageServiceTest {
139145
val destinationVersion = ActorDefinitionVersion().withSupportsDataActivation(false)
140146
every { actorDefinitionService.getActorDefinitionVersion(destinationVersionId) } returns destinationVersion
141147

142-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
148+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
143149

144150
assertNull(result)
145151
}
@@ -155,7 +161,7 @@ class ConnectorObjectStorageServiceTest {
155161
every { destinationService.getDestinationConnection(destinationId) } returns destination
156162

157163
// When
158-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
164+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
159165

160166
// Then
161167
assertNull(result)
@@ -173,7 +179,7 @@ class ConnectorObjectStorageServiceTest {
173179
val destination = createDestinationWithStorageConfig(objectStorageConfig)
174180
every { destinationService.getDestinationConnection(destinationId) } returns destination
175181

176-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
182+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
177183

178184
assertNull(result)
179185
}
@@ -189,7 +195,7 @@ class ConnectorObjectStorageServiceTest {
189195
val destination = createDestinationWithStorageConfig(objectStorageConfig)
190196
every { destinationService.getDestinationConnection(destinationId) } returns destination
191197

192-
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job)
198+
val result = connectorObjectStorageService.getRejectedRecordsForJob(connectionId, job, 100L)
193199

194200
assertNull(result)
195201
}

0 commit comments

Comments
 (0)