|
17 | 17 | import com.fasterxml.jackson.databind.node.ObjectNode;
|
18 | 18 | import com.google.common.annotations.VisibleForTesting;
|
19 | 19 | import com.google.common.collect.Iterators;
|
| 20 | +import com.google.common.collect.Lists; |
20 | 21 | import com.google.common.collect.Sets;
|
21 | 22 | import com.google.common.collect.UnmodifiableIterator;
|
22 | 23 | import io.airbyte.commons.enums.Enums;
|
|
72 | 73 | import java.util.function.Supplier;
|
73 | 74 | import java.util.stream.Collectors;
|
74 | 75 | import java.util.stream.Stream;
|
| 76 | +import org.apache.commons.lang3.StringUtils; |
75 | 77 | import org.jooq.DSLContext;
|
76 | 78 | import org.jooq.Field;
|
77 | 79 | import org.jooq.InsertValuesStepN;
|
@@ -511,6 +513,72 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
|
511 | 513 | });
|
512 | 514 | }
|
513 | 515 |
|
| 516 | + @Override |
| 517 | + public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException { |
| 518 | + final var jobIdsStr = StringUtils.join(jobIds, ','); |
| 519 | + return jobDatabase.query(ctx -> { |
| 520 | + // Instead of one massive join query, separate this query into two queries for better readability |
| 521 | + // for now. |
| 522 | + // We can combine the queries at a later date if this still proves to be not efficient enough. |
| 523 | + final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx); |
| 524 | + hydrateStreamStats(jobIdsStr, ctx, attemptStats); |
| 525 | + return attemptStats; |
| 526 | + }); |
| 527 | + } |
| 528 | + |
| 529 | + private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) { |
| 530 | + final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>(); |
| 531 | + final var syncResults = ctx.fetch( |
| 532 | + "SELECT atmpt.attempt_number, atmpt.job_id," |
| 533 | + + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " |
| 534 | + + "FROM sync_stats stats " |
| 535 | + + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " |
| 536 | + + "WHERE job_id IN ( " + jobIdsStr + ");"); |
| 537 | + syncResults.forEach(r -> { |
| 538 | + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); |
| 539 | + final var syncStats = new SyncStats() |
| 540 | + .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) |
| 541 | + .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) |
| 542 | + .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) |
| 543 | + .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); |
| 544 | + attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); |
| 545 | + }); |
| 546 | + return attemptStats; |
| 547 | + } |
| 548 | + |
| 549 | + /** |
| 550 | + * This method needed to be called after |
| 551 | + * {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats |
| 552 | + * has prepopulated the map. |
| 553 | + */ |
| 554 | + private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) { |
| 555 | + final var streamResults = ctx.fetch( |
| 556 | + "SELECT atmpt.attempt_number, atmpt.job_id, " |
| 557 | + + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " |
| 558 | + + "FROM stream_stats stats " |
| 559 | + + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " |
| 560 | + + "WHERE attempt_id IN " |
| 561 | + + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); |
| 562 | + |
| 563 | + streamResults.forEach(r -> { |
| 564 | + final var streamSyncStats = new StreamSyncStats() |
| 565 | + .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) |
| 566 | + .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) |
| 567 | + .withStats(new SyncStats() |
| 568 | + .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) |
| 569 | + .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) |
| 570 | + .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) |
| 571 | + .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); |
| 572 | + |
| 573 | + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); |
| 574 | + if (!attemptStats.containsKey(key)) { |
| 575 | + LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key); |
| 576 | + return; |
| 577 | + } |
| 578 | + attemptStats.get(key).perStreamStats().add(streamSyncStats); |
| 579 | + }); |
| 580 | + } |
| 581 | + |
514 | 582 | @Override
|
515 | 583 | public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
|
516 | 584 | return jobDatabase
|
@@ -849,7 +917,7 @@ private static Job getJobFromRecord(final Record record) {
|
849 | 917 |
|
850 | 918 | private static Attempt getAttemptFromRecord(final Record record) {
|
851 | 919 | return new Attempt(
|
852 |
| - record.get(ATTEMPT_NUMBER, Long.class), |
| 920 | + record.get(ATTEMPT_NUMBER, int.class), |
853 | 921 | record.get(JOB_ID, Long.class),
|
854 | 922 | Path.of(record.get("log_path", String.class)),
|
855 | 923 | record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
|
|
0 commit comments