Skip to content

Commit 3782a0b

Browse files
committed
Revert "Revert "Progress Bar Read APIs (#20937)" (#21115)"
This reverts commit ef335e2.
1 parent ef335e2 commit 3782a0b

File tree

11 files changed

+322
-87
lines changed

11 files changed

+322
-87
lines changed

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.fasterxml.jackson.databind.node.ObjectNode;
1818
import com.google.common.annotations.VisibleForTesting;
1919
import com.google.common.collect.Iterators;
20+
import com.google.common.collect.Lists;
2021
import com.google.common.collect.Sets;
2122
import com.google.common.collect.UnmodifiableIterator;
2223
import io.airbyte.commons.enums.Enums;
@@ -72,6 +73,7 @@
7273
import java.util.function.Supplier;
7374
import java.util.stream.Collectors;
7475
import java.util.stream.Stream;
76+
import org.apache.commons.lang3.StringUtils;
7577
import org.jooq.DSLContext;
7678
import org.jooq.Field;
7779
import org.jooq.InsertValuesStepN;
@@ -511,6 +513,72 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
511513
});
512514
}
513515

516+
@Override
517+
public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException {
518+
final var jobIdsStr = StringUtils.join(jobIds, ',');
519+
return jobDatabase.query(ctx -> {
520+
// Instead of one massive join query, separate this query into two queries for better readability
521+
// for now.
522+
// We can combine the queries at a later date if this still proves to be not efficient enough.
523+
final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx);
524+
hydrateStreamStats(jobIdsStr, ctx, attemptStats);
525+
return attemptStats;
526+
});
527+
}
528+
529+
private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
530+
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
531+
final var syncResults = ctx.fetch(
532+
"SELECT atmpt.attempt_number, atmpt.job_id,"
533+
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
534+
+ "FROM sync_stats stats "
535+
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
536+
+ "WHERE job_id IN ( " + jobIdsStr + ");");
537+
syncResults.forEach(r -> {
538+
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
539+
final var syncStats = new SyncStats()
540+
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
541+
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
542+
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
543+
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
544+
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
545+
});
546+
return attemptStats;
547+
}
548+
549+
/**
550+
* This method needed to be called after
551+
* {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats
552+
* has prepopulated the map.
553+
*/
554+
private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) {
555+
final var streamResults = ctx.fetch(
556+
"SELECT atmpt.attempt_number, atmpt.job_id, "
557+
+ "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
558+
+ "FROM stream_stats stats "
559+
+ "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id "
560+
+ "WHERE attempt_id IN "
561+
+ "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));");
562+
563+
streamResults.forEach(r -> {
564+
final var streamSyncStats = new StreamSyncStats()
565+
.withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE))
566+
.withStreamName(r.get(STREAM_STATS.STREAM_NAME))
567+
.withStats(new SyncStats()
568+
.withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED))
569+
.withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED))
570+
.withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS))
571+
.withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES)));
572+
573+
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
574+
if (!attemptStats.containsKey(key)) {
575+
LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key);
576+
return;
577+
}
578+
attemptStats.get(key).perStreamStats().add(streamSyncStats);
579+
});
580+
}
581+
514582
@Override
515583
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
516584
return jobDatabase
@@ -849,7 +917,7 @@ private static Job getJobFromRecord(final Record record) {
849917

850918
private static Attempt getAttemptFromRecord(final Record record) {
851919
return new Attempt(
852-
record.get(ATTEMPT_NUMBER, Long.class),
920+
record.get(ATTEMPT_NUMBER, int.class),
853921
record.get(JOB_ID, Long.class),
854922
Path.of(record.get("log_path", String.class)),
855923
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public interface JobPersistence {
4949
*/
5050
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}
5151

52+
record JobAttemptPair(long id, int attemptNumber) {}
53+
5254
/**
5355
* Retrieve the combined and per stream stats for a single attempt.
5456
*
@@ -57,6 +59,19 @@ record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStat
5759
*/
5860
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;
5961

62+
/**
63+
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
64+
* avoid overloading the database with too many queries.
65+
* <p>
66+
* This implementation is intended to utilise complex joins under the hood to reduce the potential
67+
* N+1 database pattern.
68+
*
69+
* @param jobIds
70+
* @return
71+
* @throws IOException
72+
*/
73+
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;
74+
6075
List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;
6176

6277
Job getJob(long jobId) throws IOException;

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
public class Attempt {
1515

16-
private final long id;
16+
private final int attemptNumber;
1717
private final long jobId;
1818
private final JobOutput output;
1919
private final AttemptStatus status;
@@ -23,7 +23,7 @@ public class Attempt {
2323
private final long createdAtInSecond;
2424
private final Long endedAtInSecond;
2525

26-
public Attempt(final long id,
26+
public Attempt(final int attemptNumber,
2727
final long jobId,
2828
final Path logPath,
2929
final @Nullable JobOutput output,
@@ -32,7 +32,7 @@ public Attempt(final long id,
3232
final long createdAtInSecond,
3333
final long updatedAtInSecond,
3434
final @Nullable Long endedAtInSecond) {
35-
this.id = id;
35+
this.attemptNumber = attemptNumber;
3636
this.jobId = jobId;
3737
this.output = output;
3838
this.status = status;
@@ -43,8 +43,8 @@ public Attempt(final long id,
4343
this.endedAtInSecond = endedAtInSecond;
4444
}
4545

46-
public long getId() {
47-
return id;
46+
public int getAttemptNumber() {
47+
return attemptNumber;
4848
}
4949

5050
public long getJobId() {
@@ -92,7 +92,7 @@ public boolean equals(final Object o) {
9292
return false;
9393
}
9494
final Attempt attempt = (Attempt) o;
95-
return id == attempt.id &&
95+
return attemptNumber == attempt.attemptNumber &&
9696
jobId == attempt.jobId &&
9797
updatedAtInSecond == attempt.updatedAtInSecond &&
9898
createdAtInSecond == attempt.createdAtInSecond &&
@@ -105,13 +105,13 @@ public boolean equals(final Object o) {
105105

106106
@Override
107107
public int hashCode() {
108-
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
108+
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
109109
}
110110

111111
@Override
112112
public String toString() {
113113
return "Attempt{" +
114-
"id=" + id +
114+
"id=" + attemptNumber +
115115
", jobId=" + jobId +
116116
", output=" + output +
117117
", status=" + status +

0 commit comments

Comments
 (0)