|
17 | 17 | import com.fasterxml.jackson.databind.node.ObjectNode;
|
18 | 18 | import com.google.common.annotations.VisibleForTesting;
|
19 | 19 | import com.google.common.collect.Iterators;
|
20 |
| -import com.google.common.collect.Lists; |
21 | 20 | import com.google.common.collect.Sets;
|
22 | 21 | import com.google.common.collect.UnmodifiableIterator;
|
23 | 22 | import io.airbyte.commons.enums.Enums;
|
|
73 | 72 | import java.util.function.Supplier;
|
74 | 73 | import java.util.stream.Collectors;
|
75 | 74 | import java.util.stream.Stream;
|
76 |
| -import org.apache.commons.lang3.StringUtils; |
77 | 75 | import org.jooq.DSLContext;
|
78 | 76 | import org.jooq.Field;
|
79 | 77 | import org.jooq.InsertValuesStepN;
|
@@ -513,72 +511,6 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
|
513 | 511 | });
|
514 | 512 | }
|
515 | 513 |
|
516 |
| - @Override |
517 |
| - public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException { |
518 |
| - final var jobIdsStr = StringUtils.join(jobIds, ','); |
519 |
| - return jobDatabase.query(ctx -> { |
520 |
| - // Instead of one massive join query, separate this query into two queries for better readability |
521 |
| - // for now. |
522 |
| - // We can combine the queries at a later date if this still proves to be not efficient enough. |
523 |
| - final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx); |
524 |
| - hydrateStreamStats(jobIdsStr, ctx, attemptStats); |
525 |
| - return attemptStats; |
526 |
| - }); |
527 |
| - } |
528 |
| - |
529 |
| - private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) { |
530 |
| - final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>(); |
531 |
| - final var syncResults = ctx.fetch( |
532 |
| - "SELECT atmpt.attempt_number, atmpt.job_id," |
533 |
| - + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " |
534 |
| - + "FROM sync_stats stats " |
535 |
| - + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " |
536 |
| - + "WHERE job_id IN ( " + jobIdsStr + ");"); |
537 |
| - syncResults.forEach(r -> { |
538 |
| - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); |
539 |
| - final var syncStats = new SyncStats() |
540 |
| - .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) |
541 |
| - .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) |
542 |
| - .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) |
543 |
| - .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); |
544 |
| - attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); |
545 |
| - }); |
546 |
| - return attemptStats; |
547 |
| - } |
548 |
| - |
549 |
| - /** |
550 |
| - * This method needed to be called after |
551 |
| - * {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats |
552 |
| - * has prepopulated the map. |
553 |
| - */ |
554 |
| - private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) { |
555 |
| - final var streamResults = ctx.fetch( |
556 |
| - "SELECT atmpt.attempt_number, atmpt.job_id, " |
557 |
| - + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " |
558 |
| - + "FROM stream_stats stats " |
559 |
| - + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " |
560 |
| - + "WHERE attempt_id IN " |
561 |
| - + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); |
562 |
| - |
563 |
| - streamResults.forEach(r -> { |
564 |
| - final var streamSyncStats = new StreamSyncStats() |
565 |
| - .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) |
566 |
| - .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) |
567 |
| - .withStats(new SyncStats() |
568 |
| - .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) |
569 |
| - .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) |
570 |
| - .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) |
571 |
| - .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); |
572 |
| - |
573 |
| - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); |
574 |
| - if (!attemptStats.containsKey(key)) { |
575 |
| - LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key); |
576 |
| - return; |
577 |
| - } |
578 |
| - attemptStats.get(key).perStreamStats().add(streamSyncStats); |
579 |
| - }); |
580 |
| - } |
581 |
| - |
582 | 514 | @Override
|
583 | 515 | public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
|
584 | 516 | return jobDatabase
|
@@ -917,7 +849,7 @@ private static Job getJobFromRecord(final Record record) {
|
917 | 849 |
|
918 | 850 | private static Attempt getAttemptFromRecord(final Record record) {
|
919 | 851 | return new Attempt(
|
920 |
| - record.get(ATTEMPT_NUMBER, int.class), |
| 852 | + record.get(ATTEMPT_NUMBER, Long.class), |
921 | 853 | record.get(JOB_ID, Long.class),
|
922 | 854 | Path.of(record.get("log_path", String.class)),
|
923 | 855 | record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
|
|
0 commit comments